Skip to content

Commit

Permalink
Merge branch 'Nitro-fixing-dockerinput-plugin' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
rafrombrc committed May 13, 2016
2 parents ada17c4 + 68b8397 commit bff889d
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 113 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ Bug Handling
Features
--------

* Added `fields_from_labels`, `container_expiry_days`, and
`new_containers_replay_logs` options to DockerLogInput.

* Added `bind_queue` option to AMQPInput.

* Added time interval configurability to Unique Items filter.
Expand Down
15 changes: 15 additions & 0 deletions docs/source/config/inputs/docker_log.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ Config:

.. versionadded:: 0.11

- fields_from_labels (array[string], optional):
A list of labels to pull is as fields. These are pulled in last and will
override any fields added from fields_from_env.
- since_path (string, optional):
Path to file where input will write a record of the "since" time for each
container to be able to not miss log records while Heka is down (see
Expand All @@ -69,6 +72,18 @@ Config:
to zero (e.g. "0s") then the file will only be written out when Heka
cleanly shuts down, meaning that if Heka crashes all container logs written
since Heka has started will be re-fetched.
- container_expiry_days (int, optional):
The number of days after which to remove unseen containers from the sinces
file. Defaults to 30 days. This prevents containers from building up
in the file forever. It has the effect of replaying logs from any container
which was not seen for this interval but then re-appears. Containers are
tracked by container ID.
- new_containers_replay_logs (bool, optional):
Will newly discovered containers replay all of the logs currently available
via the Docker logs endpoint? Defaults to true. If you are upgrading from
a previous version of heka, you may want to consider setting this to false
when first upgrading to prevent the massive replay of logs from all of
your existing containers.

Example:

Expand Down
121 changes: 47 additions & 74 deletions plugins/docker/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@ package docker
// SOFTWARE.

import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"sync"
"time"

"github.com/fsouza/go-dockerclient"
Expand All @@ -40,60 +37,47 @@ import (
"github.com/pborman/uuid"
)

type sinces struct {
Since int64
Containers map[string]int64
}

type AttachManager struct {
hostname string
client DockerClient
events chan *docker.APIEvents
ir InputRunner
endpoint string
certPath string
nameFromEnv string
fieldsFromEnv []string
fieldsFromLabels []string
sincePath string
sinces *sinces
sinceLock sync.Mutex
sinceInterval time.Duration
hostname string
client DockerClient
events chan *docker.APIEvents
ir InputRunner
endpoint string
certPath string
nameFromEnv string
fieldsFromEnv []string
fieldsFromLabels []string
sinces *SinceTracker
newContainersReplayLogs bool
}

// Construct an AttachManager and set up the Docker Client
func NewAttachManager(endpoint string, certPath string, nameFromEnv string,
fieldsFromEnv []string, fieldsFromLabels []string,
sincePath string, sinceInterval time.Duration) (*AttachManager, error) {
sincePath string, sinceInterval time.Duration, containerExpiryDays int,
newContainersReplayLogs bool) (*AttachManager, error) {

client, err := newDockerClient(certPath, endpoint)
if err != nil {
return nil, err
}

m := &AttachManager{
client: client,
events: make(chan *docker.APIEvents),
nameFromEnv: nameFromEnv,
fieldsFromEnv: fieldsFromEnv,
fieldsFromLabels: fieldsFromLabels,
sincePath: sincePath,
sinces: &sinces{},
sinceInterval: sinceInterval,
}

// Initialize the sinces from the JSON since file.
sinceFile, err := os.Open(sincePath)
sinceTracker, err := NewSinceTracker(containerExpiryDays, sincePath, sinceInterval)
if err != nil {
return nil, fmt.Errorf("Can't open \"since\" file '%s': %s", sincePath, err.Error())
return nil, err
}
jsonDecoder := json.NewDecoder(sinceFile)
m.sinceLock.Lock()
err = jsonDecoder.Decode(m.sinces)
m.sinceLock.Unlock()
if err != nil {
return nil, fmt.Errorf("Can't decode \"since\" file '%s': %s", sincePath, err.Error())

m := &AttachManager{
client: client,
events: make(chan *docker.APIEvents),
nameFromEnv: nameFromEnv,
fieldsFromEnv: fieldsFromEnv,
fieldsFromLabels: fieldsFromLabels,
sinces: sinceTracker,
newContainersReplayLogs: newContainersReplayLogs,
}

return m, nil
}

Expand Down Expand Up @@ -123,6 +107,7 @@ func (m *AttachManager) attachAll() error {
// Main body of work
func (m *AttachManager) Run(ir InputRunner, hostname string, stopChan chan error) error {
m.ir = ir
m.sinces.ir = ir
m.hostname = hostname

// Retry this sleeping in between tries During this time
Expand All @@ -132,49 +117,31 @@ func (m *AttachManager) Run(ir InputRunner, hostname string, stopChan chan error
if err != nil {
m.ir.LogError(err)
return errors.New(
"Failed to attach to Docker containers after retrying. Plugin giving up.")
"Failed to attach to Docker containers after retrying. Plugin giving up.",
)
}

err = withRetries(func() error { return m.client.AddEventListener(m.events) })
if err != nil {
m.ir.LogError(err)
return errors.New(
"Failed to add Docker event listener after retrying. Plugin giving up.")
"Failed to add Docker event listener after retrying. Plugin giving up.",
)
}

if m.sinceInterval > 0 {
if m.sinces.Interval > 0 {
go m.sinceWriteLoop(stopChan)
}
m.handleDockerEvents(stopChan) // Blocks until stopChan is closed.
// Write to since file on the way out.
m.writeSinceFile(time.Now())
m.sinces.Write(time.Now())
return nil
}

func (m *AttachManager) writeSinceFile(t time.Time) {
sinceFile, err := os.Create(m.sincePath)
if err != nil {
m.ir.LogError(fmt.Errorf("Can't create \"since\" file '%s': %s", m.sincePath,
err.Error()))
return
}
jsonEncoder := json.NewEncoder(sinceFile)
m.sinceLock.Lock()
m.sinces.Since = t.Unix()
if err = jsonEncoder.Encode(m.sinces); err != nil {
m.ir.LogError(fmt.Errorf("Can't write to \"since\" file '%s': %s", m.sincePath,
err.Error()))
}
m.sinceLock.Unlock()
if err = sinceFile.Close(); err != nil {
m.ir.LogError(fmt.Errorf("Can't close \"since\" file '%s': %s", m.sincePath,
err.Error()))
}
}

// Periodically writes out a new since file, until stopped.
func (m *AttachManager) sinceWriteLoop(stopChan chan error) {
ticker := time.Tick(m.sinceInterval)
ticker := time.Tick(m.sinces.Interval)
ok := true
var now time.Time
for ok {
Expand All @@ -183,7 +150,7 @@ func (m *AttachManager) sinceWriteLoop(stopChan chan error) {
if !ok {
break
}
m.writeSinceFile(now)
m.sinces.Write(now)
case <-stopChan:
ok = false
break
Expand Down Expand Up @@ -255,7 +222,7 @@ func (m *AttachManager) attach(id string, client DockerClient) error {

// Spin up one of these for each container we're watching.
go func() {
m.sinceLock.Lock()
m.sinces.Lock()
since, ok := m.sinces.Containers[id]
if ok {
// We've seen this container before, need to use a since value.
Expand All @@ -269,8 +236,14 @@ func (m *AttachManager) attach(id string, client DockerClient) error {
} else {
// We haven't seen it, add it to our sinces.
m.sinces.Containers[id] = 0

// And set the since appropriately from our settings.
if !m.newContainersReplayLogs {
// Use the last global since time when we connect to it
since = m.sinces.Since
}
}
m.sinceLock.Unlock()
m.sinces.Unlock()

// This will block until the container exits.
err := client.Logs(docker.LogsOptions{
Expand All @@ -282,17 +255,17 @@ func (m *AttachManager) attach(id string, client DockerClient) error {
Stderr: true,
Since: since,
Timestamps: false,
Tail: "all",
Tail: "all", // This is scoped by "Since" above
RawTerminal: false,
})

// Once it has exited, close our pipes, remove from the sinces, and (if
// necessary) log the error.
// Once it has exited, close our pipes, set the since time to now, and
// (if necessary) log the error.
outwr.Close()
errwr.Close()
m.sinceLock.Lock()
m.sinces.Lock()
m.sinces.Containers[id] = time.Now().Unix()
m.sinceLock.Unlock()
m.sinces.Unlock()
if err != nil {
err = fmt.Errorf("streaming container %s logs: %s", id, err.Error())
m.ir.LogError(err)
Expand Down
61 changes: 22 additions & 39 deletions plugins/docker/docker_log_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package docker

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"

Expand All @@ -28,13 +26,15 @@ import (

type DockerLogInputConfig struct {
// A Docker endpoint.
Endpoint string `toml:"endpoint"`
CertPath string `toml:"cert_path"`
SincePath string `toml:"since_path"`
SinceInterval string `toml:"since_interval"`
NameFromEnv string `toml:"name_from_env_var"`
FieldsFromEnv []string `toml:"fields_from_env"`
FieldsFromLabels []string `toml:"fields_from_labels"`
Endpoint string `toml:"endpoint"`
CertPath string `toml:"cert_path"`
SincePath string `toml:"since_path"`
SinceInterval string `toml:"since_interval"`
NameFromEnv string `toml:"name_from_env_var"`
FieldsFromEnv []string `toml:"fields_from_env"`
FieldsFromLabels []string `toml:"fields_from_labels"`
ContainerExpiryDays int `toml:"container_expiry_days"`
NewContainersReplayLogs bool `toml:"new_containers_replay_logs"`
}

type DockerLogInput struct {
Expand All @@ -50,16 +50,19 @@ func (di *DockerLogInput) SetPipelineConfig(pConfig *pipeline.PipelineConfig) {

func (di *DockerLogInput) ConfigStruct() interface{} {
return &DockerLogInputConfig{
Endpoint: "unix:///var/run/docker.sock",
CertPath: "",
SincePath: filepath.Join("docker", "logs_since.txt"),
SinceInterval: "5s",
Endpoint: "unix:///var/run/docker.sock",
CertPath: "",
SincePath: filepath.Join("docker", "logs_since.txt"),
SinceInterval: "5s",
ContainerExpiryDays: 30,
NewContainersReplayLogs: true,
}
}

func (di *DockerLogInput) Init(config interface{}) error {
conf := config.(*DockerLogInputConfig)
globals := di.pConfig.Globals
sincePath := globals.PrependBaseDir(conf.SincePath)

// Make sure since interval is valid.
sinceInterval, err := time.ParseDuration(conf.SinceInterval)
Expand All @@ -68,32 +71,10 @@ func (di *DockerLogInput) Init(config interface{}) error {
err.Error())
}

// Make sure the since file exists.
sincePath := globals.PrependBaseDir(conf.SincePath)
_, err = os.Stat(sincePath)
if os.IsNotExist(err) {
sinceDir := filepath.Dir(sincePath)
if err = os.MkdirAll(sinceDir, 0700); err != nil {
return fmt.Errorf("Can't create storage directory '%s': %s", sinceDir,
err.Error())
}

sinceFile, err := os.Create(sincePath)
if err != nil {
return fmt.Errorf("Can't create \"since\" file '%s': %s", sincePath,
err.Error())
}
jsonEncoder := json.NewEncoder(sinceFile)
if err = jsonEncoder.Encode(&sinces{Containers: make(map[string]int64)}); err != nil {
return fmt.Errorf("Can't write to \"since\" file '%s': %s", sincePath,
err.Error())
}
if err = sinceFile.Close(); err != nil {
return fmt.Errorf("Can't close \"since\" file '%s': %s", sincePath,
err.Error())
}
} else if err != nil {
return fmt.Errorf("Can't open \"since\" file '%s': %s", sincePath, err.Error())
// Make sure we have a sinces File.
err = EnsureSincesFile(conf, sincePath)
if err != nil {
return err
}

di.stopChan = make(chan error)
Expand All @@ -107,6 +88,8 @@ func (di *DockerLogInput) Init(config interface{}) error {
conf.FieldsFromLabels,
sincePath,
sinceInterval,
conf.ContainerExpiryDays,
conf.NewContainersReplayLogs,
)
if err != nil {
return fmt.Errorf("DockerLogInput: failed to attach: %s", err.Error())
Expand Down
Loading

0 comments on commit bff889d

Please sign in to comment.