Skip to content

Commit

Permalink
Refactor into separate file, fix state bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
relistan committed May 12, 2016
1 parent d6de7dc commit acc7b82
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 109 deletions.
80 changes: 10 additions & 70 deletions plugins/docker/attacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@ package docker
// SOFTWARE.

import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"sync"
"time"

"github.com/fsouza/go-dockerclient"
Expand All @@ -40,15 +37,6 @@ import (
"github.com/pborman/uuid"
)

// Tracks the timestamp of the last time we logged
// data from a particular container.
type sinces struct {
sync.Mutex
Path string
Interval time.Duration
Since int64
Containers map[string]int64
}

type AttachManager struct {
hostname string
Expand All @@ -60,8 +48,7 @@ type AttachManager struct {
nameFromEnv string
fieldsFromEnv []string
fieldsFromLabels []string
sinces *sinces
containerExpiryDays int
sinces *SinceTracker
newContainersReplayLogs bool
}

Expand All @@ -76,29 +63,21 @@ func NewAttachManager(endpoint string, certPath string, nameFromEnv string,
return nil, err
}

sinceTracker, err := NewSinceTracker(containerExpiryDays, sincePath, sinceInterval)
if err != nil {
return nil, err
}

m := &AttachManager{
client: client,
events: make(chan *docker.APIEvents),
nameFromEnv: nameFromEnv,
fieldsFromEnv: fieldsFromEnv,
fieldsFromLabels: fieldsFromLabels,
sinces: &sinces{Path: sincePath, Interval: sinceInterval},
containerExpiryDays: containerExpiryDays,
sinces: sinceTracker,
newContainersReplayLogs: newContainersReplayLogs,
}

// Initialize the sinces from the JSON since file.
sinceFile, err := os.Open(sincePath)
if err != nil {
return nil, fmt.Errorf("Can't open \"since\" file '%s': %s", sincePath, err.Error())
}
jsonDecoder := json.NewDecoder(sinceFile)
m.sinces.Lock()
err = jsonDecoder.Decode(m.sinces)
m.sinces.Unlock()
if err != nil {
return nil, fmt.Errorf("Can't decode \"since\" file '%s': %s", sincePath, err.Error())
}
return m, nil
}

Expand Down Expand Up @@ -128,6 +107,7 @@ func (m *AttachManager) attachAll() error {
// Main body of work
func (m *AttachManager) Run(ir InputRunner, hostname string, stopChan chan error) error {
m.ir = ir
m.sinces.ir = ir
m.hostname = hostname

// Retry this sleeping in between tries During this time
Expand All @@ -154,50 +134,10 @@ func (m *AttachManager) Run(ir InputRunner, hostname string, stopChan chan error
}
m.handleDockerEvents(stopChan) // Blocks until stopChan is closed.
// Write to since file on the way out.
m.writeSinceFile(time.Now())
m.sinces.Write(time.Now())
return nil
}

func (m *AttachManager) writeSinceFile(t time.Time) {
sinceFile, err := os.Create(m.sinces.Path)
if err != nil {
m.ir.LogError(fmt.Errorf("Can't create \"since\" file '%s': %s", m.sinces.Path,
err.Error()))
return
}
jsonEncoder := json.NewEncoder(sinceFile)
m.sinces.Lock()
m.sinces.Since = t.Unix()

// Eject since containers that are too old to keep so we don't build up
// a list forever.
for container, lastSeen := range m.sinces.Containers {
if lastSeen == 0 {
continue
}

if lastSeen < time.Now().Unix() - int64(m.containerExpiryDays) * 3600 * 24 {
delete(m.sinces.Containers, container)
}
}

// Whitelist the parts of the struct we save to disk
outStruct := struct {
Since int64
Containers map[string]int64
}{m.sinces.Since, m.sinces.Containers}

if err = jsonEncoder.Encode(outStruct); err != nil {
m.ir.LogError(fmt.Errorf("Can't write to \"since\" file '%s': %s", m.sinces.Path,
err.Error()))
}

m.sinces.Unlock()
if err = sinceFile.Close(); err != nil {
m.ir.LogError(fmt.Errorf("Can't close \"since\" file '%s': %s", m.sinces.Path,
err.Error()))
}
}

// Periodically writes out a new since file, until stopped.
func (m *AttachManager) sinceWriteLoop(stopChan chan error) {
Expand All @@ -210,7 +150,7 @@ func (m *AttachManager) sinceWriteLoop(stopChan chan error) {
if !ok {
break
}
m.writeSinceFile(now)
m.sinces.Write(now)
case <-stopChan:
ok = false
break
Expand Down
45 changes: 6 additions & 39 deletions plugins/docker/docker_log_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package docker

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"

Expand Down Expand Up @@ -52,46 +50,15 @@ func (di *DockerLogInput) SetPipelineConfig(pConfig *pipeline.PipelineConfig) {

func (di *DockerLogInput) ConfigStruct() interface{} {
return &DockerLogInputConfig{
Endpoint: "unix:///var/run/docker.sock",
CertPath: "",
SincePath: filepath.Join("docker", "logs_since.txt"),
SinceInterval: "5s",
ContainerExpiryDays: 30,
Endpoint: "unix:///var/run/docker.sock",
CertPath: "",
SincePath: filepath.Join("docker", "logs_since.txt"),
SinceInterval: "5s",
ContainerExpiryDays: 30,
NewContainersReplayLogs: true,
}
}

func (di *DockerLogInput) ensureSincesFile(conf *DockerLogInputConfig, sincePath string) error {
// Make sure the since file exists.
_, err := os.Stat(sincePath)
if os.IsNotExist(err) {
sinceDir := filepath.Dir(sincePath)
if err = os.MkdirAll(sinceDir, 0700); err != nil {
return fmt.Errorf("Can't create storage directory '%s': %s", sinceDir,
err.Error())
}

sinceFile, err := os.Create(sincePath)
if err != nil {
return fmt.Errorf("Can't create \"since\" file '%s': %s", sincePath,
err.Error())
}
jsonEncoder := json.NewEncoder(sinceFile)
if err = jsonEncoder.Encode(&sinces{Containers: make(map[string]int64)}); err != nil {
return fmt.Errorf("Can't write to \"since\" file '%s': %s", sincePath,
err.Error())
}
if err = sinceFile.Close(); err != nil {
return fmt.Errorf("Can't close \"since\" file '%s': %s", sincePath,
err.Error())
}
} else if err != nil {
return fmt.Errorf("Can't open \"since\" file '%s': %s", sincePath, err.Error())
}

return nil
}

func (di *DockerLogInput) Init(config interface{}) error {
conf := config.(*DockerLogInputConfig)
globals := di.pConfig.Globals
Expand All @@ -105,7 +72,7 @@ func (di *DockerLogInput) Init(config interface{}) error {
}

// Make sure we have a sinces File.
err = di.ensureSincesFile(conf, sincePath)
err = EnsureSincesFile(conf, sincePath)
if err != nil {
return err
}
Expand Down
150 changes: 150 additions & 0 deletions plugins/docker/since_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/***** BEGIN LICENSE BLOCK *****
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this file,
# You can obtain one at http://mozilla.org/MPL/2.0/.
#
# The Initial Developer of the Original Code is the Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2014
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Rob Miller ([email protected])
# Karl Matthias ([email protected])
#
# ***** END LICENSE BLOCK *****/

package docker

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"

"github.com/mozilla-services/heka/pipeline"
)

// Tracks the timestamp of the last time we logged
// data from a particular container.
type SinceTracker struct {
sync.Mutex
Path string `json:"-"`
Interval time.Duration `json:"-"`
ExpiryDays int `json:"-"`
Since int64
Containers map[string]int64
ir pipeline.InputRunner
}

// Return a fully configured SinceTracker.
func NewSinceTracker(expiryDays int, path string, interval time.Duration) (*SinceTracker, error) {
sinceTracker := &SinceTracker{
Path: path,
Interval: interval,
ExpiryDays: expiryDays,
}

if err := sinceTracker.Load(); err != nil {
return nil, err
}

return sinceTracker, nil
}

// Load state from the since file into the SinceTracker.
func (s *SinceTracker) Load() error {
// Initialize the sinces from the JSON since file.
sinceFile, err := os.Open(s.Path)
if err != nil {
return fmt.Errorf("Can't open \"since\" file '%s': %s", s.Path, err.Error())
}

jsonDecoder := json.NewDecoder(sinceFile)

s.Lock()
err = jsonDecoder.Decode(s)
s.Unlock()

if err != nil {
return fmt.Errorf("Can't decode \"since\" file '%s': %s", s.Path, err.Error())
}

return nil
}

// Write out the current data in the SinceTracker to the sinces file.
func (s *SinceTracker) Write(t time.Time) {
sinceFile, err := os.Create(s.Path)
if err != nil {
s.ir.LogError(fmt.Errorf("Can't create \"since\" file '%s': %s", s.Path,
err.Error()))
return
}
jsonEncoder := json.NewEncoder(sinceFile)
s.Lock()
s.Since = t.Unix()

// Eject since containers that are too old to keep so we don't build up
// a list forever.
for container, lastSeen := range s.Containers {
if lastSeen == 0 {
continue
}

if lastSeen < time.Now().Unix()-int64(s.ExpiryDays)*3600*24 {
delete(s.Containers, container)
}
}

// Whitelist the parts of the struct we save to disk
outStruct := struct {
Since *int64
Containers *map[string]int64
}{&s.Since, &s.Containers}

if err = jsonEncoder.Encode(outStruct); err != nil {
s.ir.LogError(fmt.Errorf("Can't write to \"since\" file '%s': %s", s.Path,
err.Error()))
}

s.Unlock()
if err = sinceFile.Close(); err != nil {
s.ir.LogError(fmt.Errorf("Can't close \"since\" file '%s': %s", s.Path,
err.Error()))
}
}

// Make sure the file exists and is writable
func EnsureSincesFile(conf *DockerLogInputConfig, sincePath string) error {
// Make sure the since file exists.
_, err := os.Stat(sincePath)
if os.IsNotExist(err) {
sinceDir := filepath.Dir(sincePath)
if err = os.MkdirAll(sinceDir, 0700); err != nil {
return fmt.Errorf("Can't create storage directory '%s': %s", sinceDir,
err.Error())
}

sinceFile, err := os.Create(sincePath)
if err != nil {
return fmt.Errorf("Can't create \"since\" file '%s': %s", sincePath,
err.Error())
}

jsonEncoder := json.NewEncoder(sinceFile)
if err = jsonEncoder.Encode(&SinceTracker{Containers: make(map[string]int64)}); err != nil {
return fmt.Errorf("Can't write to \"since\" file '%s': %s", sincePath,
err.Error())
}
if err = sinceFile.Close(); err != nil {
return fmt.Errorf("Can't close \"since\" file '%s': %s", sincePath,
err.Error())
}
} else if err != nil {
return fmt.Errorf("Can't open \"since\" file '%s': %s", sincePath, err.Error())
}

return nil
}

0 comments on commit acc7b82

Please sign in to comment.