Skip to content

Commit

Permalink
Add ProcessState type to base package
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken committed Feb 22, 2020
1 parent f9a6c61 commit 830020e
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 79 deletions.
25 changes: 11 additions & 14 deletions background.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ type Background struct {
mu sync.Mutex
running bool

// channel to send state updates.
stateCh chan<- string
ps *base.ProcessState

// wait group to wait for all goroutines to finish.
wg sync.WaitGroup
Expand Down Expand Up @@ -131,18 +130,17 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
pid := os.Getpid()

rdb := rdb.NewRDB(createRedisClient(r))
syncRequestCh := make(chan *syncRequest)
stateCh := make(chan string)
workerCh := make(chan int)
cancelations := base.NewCancelations()
syncer := newSyncer(syncRequestCh, 5*time.Second)
heartbeater := newHeartbeater(rdb, host, pid, n, queues, cfg.StrictPriority, 5*time.Second, stateCh, workerCh)
ps := base.NewProcessState(host, pid, n, queues, cfg.StrictPriority)
syncCh := make(chan *syncRequest)
cancels := base.NewCancelations()
syncer := newSyncer(syncCh, 5*time.Second)
heartbeater := newHeartbeater(rdb, ps, 5*time.Second)
scheduler := newScheduler(rdb, 5*time.Second, queues)
processor := newProcessor(rdb, queues, cfg.StrictPriority, n, delayFunc, syncRequestCh, workerCh, cancelations)
subscriber := newSubscriber(rdb, cancelations)
processor := newProcessor(rdb, ps, delayFunc, syncCh, cancels)
subscriber := newSubscriber(rdb, cancels)
return &Background{
stateCh: stateCh,
rdb: rdb,
ps: ps,
scheduler: scheduler,
processor: processor,
syncer: syncer,
Expand Down Expand Up @@ -194,7 +192,7 @@ func (bg *Background) Run(handler Handler) {
sig := <-sigs
if sig == syscall.SIGTSTP {
bg.processor.stop()
bg.stateCh <- "stopped"
bg.ps.SetStatus(base.StatusStopped)
continue
}
break
Expand Down Expand Up @@ -232,8 +230,7 @@ func (bg *Background) stop() {
// Note: The order of termination is important.
// Sender goroutines should be terminated before the receiver goroutines.
//
// processor -> syncer (via syncRequestCh)
// processor -> heartbeater (via workerCh)
// processor -> syncer (via syncCh)
bg.scheduler.terminate()
bg.processor.terminate()
bg.syncer.terminate()
Expand Down
31 changes: 8 additions & 23 deletions heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,20 @@ import (
type heartbeater struct {
rdb *rdb.RDB

pinfo *base.ProcessInfo
ps *base.ProcessState

// channel to communicate back to the long running "heartbeater" goroutine.
done chan struct{}

// channel to receive updates on process state.
stateCh <-chan string

// channel to recieve updates on workers count.
workerCh <-chan int

// interval between heartbeats.
interval time.Duration
}

func newHeartbeater(rdb *rdb.RDB, host string, pid, concurrency int, queues map[string]int, strict bool,
interval time.Duration, stateCh <-chan string, workerCh <-chan int) *heartbeater {
func newHeartbeater(rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration) *heartbeater {
return &heartbeater{
rdb: rdb,
pinfo: base.NewProcessInfo(host, pid, concurrency, queues, strict),
ps: ps,
done: make(chan struct{}),
stateCh: stateCh,
workerCh: workerCh,
interval: interval,
}
}
Expand All @@ -51,26 +42,20 @@ func (h *heartbeater) terminate() {
}

func (h *heartbeater) start(wg *sync.WaitGroup) {
h.pinfo.Started = time.Now()
h.pinfo.State = "running"
h.ps.SetStarted(time.Now())
h.ps.SetStatus(base.StatusRunning)
wg.Add(1)
go func() {
defer wg.Done()
h.beat()
timer := time.NewTimer(h.interval)
for {
select {
case <-h.done:
h.rdb.ClearProcessInfo(h.pinfo)
h.rdb.ClearProcessInfo(h.ps.Get())
logger.info("Heartbeater done")
return
case state := <-h.stateCh:
h.pinfo.State = state
case delta := <-h.workerCh:
h.pinfo.ActiveWorkerCount += delta
case <-timer.C:
case <-time.After(h.interval):
h.beat()
timer.Reset(h.interval)
}
}
}()
Expand All @@ -79,7 +64,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) {
func (h *heartbeater) beat() {
// Note: Set TTL to be long enough so that it won't expire before we write again
// and short enough to expire quickly once the process is shut down or killed.
err := h.rdb.WriteProcessInfo(h.pinfo, h.interval*2)
err := h.rdb.WriteProcessInfo(h.ps.Get(), h.interval*2)
if err != nil {
logger.error("could not write heartbeat data: %v", err)
}
Expand Down
13 changes: 6 additions & 7 deletions heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ func TestHeartbeater(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r)

stateCh := make(chan string)
workerCh := make(chan int)
hb := newHeartbeater(rdbClient, tc.host, tc.pid, tc.concurrency, tc.queues, false, tc.interval, stateCh, workerCh)
state := base.NewProcessState(tc.host, tc.pid, tc.concurrency, tc.queues, false)
hb := newHeartbeater(rdbClient, state, tc.interval)

var wg sync.WaitGroup
hb.start(&wg)
Expand All @@ -48,7 +47,7 @@ func TestHeartbeater(t *testing.T) {
Queues: tc.queues,
Concurrency: tc.concurrency,
Started: time.Now(),
State: "running",
Status: "running",
}

// allow for heartbeater to write to redis
Expand All @@ -73,13 +72,13 @@ func TestHeartbeater(t *testing.T) {
continue
}

// state change
stateCh <- "stopped"
// status change
state.SetStatus(base.StatusStopped)

// allow for heartbeater to write to redis
time.Sleep(tc.interval * 2)

want.State = "stopped"
want.Status = "stopped"
ps, err = rdbClient.ListProcesses()
if err != nil {
t.Errorf("could not read process status from redis: %v", err)
Expand Down
114 changes: 101 additions & 13 deletions internal/base/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,32 +87,120 @@ type TaskMessage struct {
Timeout string
}

// ProcessState holds process level information.
//
// ProcessStates are safe for concurrent use by multiple goroutines.
type ProcessState struct {
mu sync.Mutex // guards all data fields
concurrency int
queues map[string]int
strictPriority bool
pid int
host string
status PStatus
started time.Time
activeWorkerCount int
}

// PStatus represents status of a process.
type PStatus int

const (
// StatusIdle indicates process is in idle state.
StatusIdle PStatus = iota

// StatusRunning indicates process is up and processing tasks.
StatusRunning

// StatusStopped indicates process is up but not processing new tasks.
StatusStopped
)

var statuses = []string{
"idle",
"running",
"stopped",
}

func (s PStatus) String() string {
if StatusIdle <= s && s <= StatusStopped {
return statuses[s]
}
return "unknown status"
}

// NewProcessState returns a new instance of ProcessState.
func NewProcessState(host string, pid, concurrency int, queues map[string]int, strict bool) *ProcessState {
return &ProcessState{
host: host,
pid: pid,
concurrency: concurrency,
queues: cloneQueueConfig(queues),
strictPriority: strict,
status: StatusIdle,
}
}

// SetStatus updates the state of process.
func (ps *ProcessState) SetStatus(status PStatus) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.status = status
}

// SetStarted records when the process started processing.
func (ps *ProcessState) SetStarted(t time.Time) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.started = t
}

// IncrWorkerCount increments the worker count by delta.
func (ps *ProcessState) IncrWorkerCount(delta int) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.activeWorkerCount += delta
}

// Get returns current state of process as a ProcessInfo.
func (ps *ProcessState) Get() *ProcessInfo {
ps.mu.Lock()
defer ps.mu.Unlock()
return &ProcessInfo{
Host: ps.host,
PID: ps.pid,
Concurrency: ps.concurrency,
Queues: cloneQueueConfig(ps.queues),
StrictPriority: ps.strictPriority,
Status: ps.status.String(),
Started: ps.started,
ActiveWorkerCount: ps.activeWorkerCount,
}
}

func cloneQueueConfig(qcfg map[string]int) map[string]int {
res := make(map[string]int)
for qname, n := range qcfg {
res[qname] = n
}
return res
}

// ProcessInfo holds information about running background worker process.
type ProcessInfo struct {
Concurrency int
Queues map[string]int
StrictPriority bool
PID int
Host string
State string
Status string
Started time.Time
ActiveWorkerCount int
}

// NewProcessInfo returns a new instance of ProcessInfo.
func NewProcessInfo(host string, pid, concurrency int, queues map[string]int, strict bool) *ProcessInfo {
return &ProcessInfo{
Host: host,
PID: pid,
Concurrency: concurrency,
Queues: queues,
StrictPriority: strict,
}
}

// Cancelations is a collection that holds cancel functions for all in-progress tasks.
//
// Its methods are safe to be used in multiple goroutines.
// Cancelations are safe for concurrent use by multipel goroutines.
type Cancelations struct {
mu sync.Mutex
cancelFuncs map[string]context.CancelFunc
Expand Down
4 changes: 2 additions & 2 deletions internal/rdb/inspect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2059,7 +2059,7 @@ func TestListProcesses(t *testing.T) {
Queues: map[string]int{"default": 1},
Host: "do.droplet1",
PID: 1234,
State: "running",
Status: "running",
Started: time.Now().Add(-time.Hour),
ActiveWorkerCount: 5,
}
Expand All @@ -2069,7 +2069,7 @@ func TestListProcesses(t *testing.T) {
Queues: map[string]int{"email": 1},
Host: "do.droplet2",
PID: 9876,
State: "stopped",
Status: "stopped",
Started: time.Now().Add(-2 * time.Hour),
ActiveWorkerCount: 20,
}
Expand Down
2 changes: 1 addition & 1 deletion internal/rdb/rdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ func TestReadWriteClearProcessInfo(t *testing.T) {
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
PID: 98765,
Host: "localhost",
State: "running",
Status: "running",
Started: time.Now(),
ActiveWorkerCount: 1,
}
Expand Down
25 changes: 12 additions & 13 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
type processor struct {
rdb *rdb.RDB

ps *base.ProcessState

handler Handler

queueConfig map[string]int
Expand All @@ -32,9 +34,6 @@ type processor struct {
// channel via which to send sync requests to syncer.
syncRequestCh chan<- *syncRequest

// channel to send worker count updates.
workerCh chan<- int

// rate limiter to prevent spamming logs with a bunch of errors.
errLogLimiter *rate.Limiter

Expand All @@ -60,23 +59,23 @@ type processor struct {
type retryDelayFunc func(n int, err error, task *Task) time.Duration

// newProcessor constructs a new processor.
func newProcessor(r *rdb.RDB, queues map[string]int, strict bool, concurrency int, fn retryDelayFunc,
syncRequestCh chan<- *syncRequest, workerCh chan<- int, cancelations *base.Cancelations) *processor {
qcfg := normalizeQueueCfg(queues)
func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc, syncCh chan<- *syncRequest, c *base.Cancelations) *processor {
info := ps.Get()
qcfg := normalizeQueueCfg(info.Queues)
orderedQueues := []string(nil)
if strict {
if info.StrictPriority {
orderedQueues = sortByPriority(qcfg)
}
return &processor{
rdb: r,
ps: ps,
queueConfig: qcfg,
orderedQueues: orderedQueues,
retryDelayFunc: fn,
syncRequestCh: syncRequestCh,
workerCh: workerCh,
cancelations: cancelations,
syncRequestCh: syncCh,
cancelations: c,
errLogLimiter: rate.NewLimiter(rate.Every(3*time.Second), 1),
sema: make(chan struct{}, concurrency),
sema: make(chan struct{}, info.Concurrency),
done: make(chan struct{}),
abort: make(chan struct{}),
quit: make(chan struct{}),
Expand Down Expand Up @@ -166,10 +165,10 @@ func (p *processor) exec() {
p.requeue(msg)
return
case p.sema <- struct{}{}: // acquire token
p.workerCh <- 1
p.ps.IncrWorkerCount(1)
go func() {
defer func() {
p.workerCh <- -1
p.ps.IncrWorkerCount(-1)
<-p.sema /* release token */
}()

Expand Down
Loading

0 comments on commit 830020e

Please sign in to comment.