Skip to content

Commit

Permalink
Add important settings to DockerInputLog plugin.
Browse files Browse the repository at this point in the history
1. `new_containers_replay_logs`
  This will introduce a setting to prevent massive replay of
  logs when updating to the newest plugin on a running system.
  Otherwise all the logs for all running containers will be
  streamed into Heka. If a previous version of Heka was running
  this will be a huge amount of duplicated logging.

2. `container_expiry_days`
  Adds a setting to expire containers from the sinces file
  so that they don't build up in the file forever.

3. Refactor some of the code for readability and encapsulation.
  • Loading branch information
relistan committed May 11, 2016
1 parent ada17c4 commit c733640
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 62 deletions.
113 changes: 71 additions & 42 deletions plugins/docker/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,46 +40,51 @@ import (
"github.com/pborman/uuid"
)

// Tracks the timestamp of the last time we logged
// data from a particular container.
type sinces struct {
sync.Mutex
Path string
Interval time.Duration
Since int64
Containers map[string]int64
}

type AttachManager struct {
hostname string
client DockerClient
events chan *docker.APIEvents
ir InputRunner
endpoint string
certPath string
nameFromEnv string
fieldsFromEnv []string
fieldsFromLabels []string
sincePath string
sinces *sinces
sinceLock sync.Mutex
sinceInterval time.Duration
hostname string
client DockerClient
events chan *docker.APIEvents
ir InputRunner
endpoint string
certPath string
nameFromEnv string
fieldsFromEnv []string
fieldsFromLabels []string
sinces *sinces
containerExpiryDays int
newContainersReplayLogs bool
}

// Construct an AttachManager and set up the Docker Client
func NewAttachManager(endpoint string, certPath string, nameFromEnv string,
fieldsFromEnv []string, fieldsFromLabels []string,
sincePath string, sinceInterval time.Duration) (*AttachManager, error) {
sincePath string, sinceInterval time.Duration, containerExpiryDays int,
newContainersReplayLogs bool) (*AttachManager, error) {

client, err := newDockerClient(certPath, endpoint)
if err != nil {
return nil, err
}

m := &AttachManager{
client: client,
events: make(chan *docker.APIEvents),
nameFromEnv: nameFromEnv,
fieldsFromEnv: fieldsFromEnv,
fieldsFromLabels: fieldsFromLabels,
sincePath: sincePath,
sinces: &sinces{},
sinceInterval: sinceInterval,
client: client,
events: make(chan *docker.APIEvents),
nameFromEnv: nameFromEnv,
fieldsFromEnv: fieldsFromEnv,
fieldsFromLabels: fieldsFromLabels,
sinces: &sinces{Path: sincePath, Interval: sinceInterval},
containerExpiryDays: containerExpiryDays,
newContainersReplayLogs: newContainersReplayLogs,
}

// Initialize the sinces from the JSON since file.
Expand All @@ -88,9 +93,9 @@ func NewAttachManager(endpoint string, certPath string, nameFromEnv string,
return nil, fmt.Errorf("Can't open \"since\" file '%s': %s", sincePath, err.Error())
}
jsonDecoder := json.NewDecoder(sinceFile)
m.sinceLock.Lock()
m.sinces.Lock()
err = jsonDecoder.Decode(m.sinces)
m.sinceLock.Unlock()
m.sinces.Unlock()
if err != nil {
return nil, fmt.Errorf("Can't decode \"since\" file '%s': %s", sincePath, err.Error())
}
Expand Down Expand Up @@ -132,17 +137,19 @@ func (m *AttachManager) Run(ir InputRunner, hostname string, stopChan chan error
if err != nil {
m.ir.LogError(err)
return errors.New(
"Failed to attach to Docker containers after retrying. Plugin giving up.")
"Failed to attach to Docker containers after retrying. Plugin giving up.",
)
}

err = withRetries(func() error { return m.client.AddEventListener(m.events) })
if err != nil {
m.ir.LogError(err)
return errors.New(
"Failed to add Docker event listener after retrying. Plugin giving up.")
"Failed to add Docker event listener after retrying. Plugin giving up.",
)
}

if m.sinceInterval > 0 {
if m.sinces.Interval > 0 {
go m.sinceWriteLoop(stopChan)
}
m.handleDockerEvents(stopChan) // Blocks until stopChan is closed.
Expand All @@ -152,29 +159,45 @@ func (m *AttachManager) Run(ir InputRunner, hostname string, stopChan chan error
}

func (m *AttachManager) writeSinceFile(t time.Time) {
sinceFile, err := os.Create(m.sincePath)
sinceFile, err := os.Create(m.sinces.Path)
if err != nil {
m.ir.LogError(fmt.Errorf("Can't create \"since\" file '%s': %s", m.sincePath,
m.ir.LogError(fmt.Errorf("Can't create \"since\" file '%s': %s", m.sinces.Path,
err.Error()))
return
}
jsonEncoder := json.NewEncoder(sinceFile)
m.sinceLock.Lock()
m.sinces.Lock()
m.sinces.Since = t.Unix()
if err = jsonEncoder.Encode(m.sinces); err != nil {
m.ir.LogError(fmt.Errorf("Can't write to \"since\" file '%s': %s", m.sincePath,

// Eject since containers that are too old to keep so we don't build up
// a list forever.
for container, lastSeen := range m.sinces.Containers {
if lastSeen < time.Now().Unix() - int64(m.containerExpiryDays) * 3600 * 24 {
delete(m.sinces.Containers, container)
}
}

// Whitelist the parts of the struct we save to disk
outStruct := struct {
Since int64
Containers map[string]int64
}{m.sinces.Since, m.sinces.Containers}

if err = jsonEncoder.Encode(outStruct); err != nil {
m.ir.LogError(fmt.Errorf("Can't write to \"since\" file '%s': %s", m.sinces.Path,
err.Error()))
}
m.sinceLock.Unlock()

m.sinces.Unlock()
if err = sinceFile.Close(); err != nil {
m.ir.LogError(fmt.Errorf("Can't close \"since\" file '%s': %s", m.sincePath,
m.ir.LogError(fmt.Errorf("Can't close \"since\" file '%s': %s", m.sinces.Path,
err.Error()))
}
}

// Periodically writes out a new since file, until stopped.
func (m *AttachManager) sinceWriteLoop(stopChan chan error) {
ticker := time.Tick(m.sinceInterval)
ticker := time.Tick(m.sinces.Interval)
ok := true
var now time.Time
for ok {
Expand Down Expand Up @@ -255,7 +278,7 @@ func (m *AttachManager) attach(id string, client DockerClient) error {

// Spin up one of these for each container we're watching.
go func() {
m.sinceLock.Lock()
m.sinces.Lock()
since, ok := m.sinces.Containers[id]
if ok {
// We've seen this container before, need to use a since value.
Expand All @@ -269,8 +292,14 @@ func (m *AttachManager) attach(id string, client DockerClient) error {
} else {
// We haven't seen it, add it to our sinces.
m.sinces.Containers[id] = 0

// And set the since appropriately from our settings
if !m.newContainersReplayLogs {
// Use the last global since time when we connect to it
since = m.sinces.Since
}
}
m.sinceLock.Unlock()
m.sinces.Unlock()

// This will block until the container exits.
err := client.Logs(docker.LogsOptions{
Expand All @@ -282,17 +311,17 @@ func (m *AttachManager) attach(id string, client DockerClient) error {
Stderr: true,
Since: since,
Timestamps: false,
Tail: "all",
Tail: "all", // This is scoped by "Since" above
RawTerminal: false,
})

// Once it has exited, close our pipes, remove from the sinces, and (if
// necessary) log the error.
// Once it has exited, close our pipes, set the since time to now, and
// (if necessary) log the error.
outwr.Close()
errwr.Close()
m.sinceLock.Lock()
m.sinces.Lock()
m.sinces.Containers[id] = time.Now().Unix()
m.sinceLock.Unlock()
m.sinces.Unlock()
if err != nil {
err = fmt.Errorf("streaming container %s logs: %s", id, err.Error())
m.ir.LogError(err)
Expand Down
56 changes: 36 additions & 20 deletions plugins/docker/docker_log_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ import (

type DockerLogInputConfig struct {
// A Docker endpoint.
Endpoint string `toml:"endpoint"`
CertPath string `toml:"cert_path"`
SincePath string `toml:"since_path"`
SinceInterval string `toml:"since_interval"`
NameFromEnv string `toml:"name_from_env_var"`
FieldsFromEnv []string `toml:"fields_from_env"`
FieldsFromLabels []string `toml:"fields_from_labels"`
Endpoint string `toml:"endpoint"`
CertPath string `toml:"cert_path"`
SincePath string `toml:"since_path"`
SinceInterval string `toml:"since_interval"`
NameFromEnv string `toml:"name_from_env_var"`
FieldsFromEnv []string `toml:"fields_from_env"`
FieldsFromLabels []string `toml:"fields_from_labels"`
ContainerExpiryDays int `toml:"container_expiry_days"`
NewContainersReplayLogs bool `toml:"new_containers_replay_logs"`
}

type DockerLogInput struct {
Expand All @@ -54,23 +56,14 @@ func (di *DockerLogInput) ConfigStruct() interface{} {
CertPath: "",
SincePath: filepath.Join("docker", "logs_since.txt"),
SinceInterval: "5s",
ContainerExpiryDays: 30,
NewContainersReplayLogs: true,
}
}

func (di *DockerLogInput) Init(config interface{}) error {
conf := config.(*DockerLogInputConfig)
globals := di.pConfig.Globals

// Make sure since interval is valid.
sinceInterval, err := time.ParseDuration(conf.SinceInterval)
if err != nil {
return fmt.Errorf("Can't parse since_interval value '%s': %s", conf.SinceInterval,
err.Error())
}

func (di *DockerLogInput) ensureSincesFile(conf *DockerLogInputConfig, sincePath string) error {
// Make sure the since file exists.
sincePath := globals.PrependBaseDir(conf.SincePath)
_, err = os.Stat(sincePath)
_, err := os.Stat(sincePath)
if os.IsNotExist(err) {
sinceDir := filepath.Dir(sincePath)
if err = os.MkdirAll(sinceDir, 0700); err != nil {
Expand All @@ -96,6 +89,27 @@ func (di *DockerLogInput) Init(config interface{}) error {
return fmt.Errorf("Can't open \"since\" file '%s': %s", sincePath, err.Error())
}

return nil
}

func (di *DockerLogInput) Init(config interface{}) error {
conf := config.(*DockerLogInputConfig)
globals := di.pConfig.Globals
sincePath := globals.PrependBaseDir(conf.SincePath)

// Make sure since interval is valid.
sinceInterval, err := time.ParseDuration(conf.SinceInterval)
if err != nil {
return fmt.Errorf("Can't parse since_interval value '%s': %s", conf.SinceInterval,
err.Error())
}

// Make sure we have a sinces File.
err = di.ensureSincesFile(conf, sincePath)
if err != nil {
return err
}

di.stopChan = make(chan error)
di.closer = make(chan struct{})

Expand All @@ -107,6 +121,8 @@ func (di *DockerLogInput) Init(config interface{}) error {
conf.FieldsFromLabels,
sincePath,
sinceInterval,
conf.ContainerExpiryDays,
conf.NewContainersReplayLogs,
)
if err != nil {
return fmt.Errorf("DockerLogInput: failed to attach: %s", err.Error())
Expand Down

0 comments on commit c733640

Please sign in to comment.