Skip to content

Commit

Permalink
Rename Background to Server
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken committed Apr 19, 2020
1 parent 022dc29 commit f9842ba
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 65 deletions.
18 changes: 9 additions & 9 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func BenchmarkEndToEndSimple(b *testing.B) {
DB: redisDB,
}
client := NewClient(redis)
bg := NewBackground(redis, &Config{
srv := NewServer(redis, Config{
Concurrency: 10,
RetryDelayFunc: func(n int, err error, t *Task) time.Duration {
return time.Second
Expand All @@ -46,11 +46,11 @@ func BenchmarkEndToEndSimple(b *testing.B) {
}
b.StartTimer() // end setup

bg.start(HandlerFunc(handler))
srv.start(HandlerFunc(handler))
wg.Wait()

b.StopTimer() // begin teardown
bg.stop()
srv.stop()
b.StartTimer() // end teardown
}
}
Expand All @@ -67,7 +67,7 @@ func BenchmarkEndToEnd(b *testing.B) {
DB: redisDB,
}
client := NewClient(redis)
bg := NewBackground(redis, &Config{
srv := NewServer(redis, Config{
Concurrency: 10,
RetryDelayFunc: func(n int, err error, t *Task) time.Duration {
return time.Second
Expand Down Expand Up @@ -99,11 +99,11 @@ func BenchmarkEndToEnd(b *testing.B) {
}
b.StartTimer() // end setup

bg.start(HandlerFunc(handler))
srv.start(HandlerFunc(handler))
wg.Wait()

b.StopTimer() // begin teardown
bg.stop()
srv.stop()
b.StartTimer() // end teardown
}
}
Expand All @@ -124,7 +124,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) {
DB: redisDB,
}
client := NewClient(redis)
bg := NewBackground(redis, &Config{
srv := NewServer(redis, Config{
Concurrency: 10,
Queues: map[string]int{
"high": 6,
Expand Down Expand Up @@ -160,11 +160,11 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) {
}
b.StartTimer() // end setup

bg.start(HandlerFunc(handler))
srv.start(HandlerFunc(handler))
wg.Wait()

b.StopTimer() // begin teardown
bg.stop()
srv.stop()
b.StartTimer() // end teardown
}
}
84 changes: 42 additions & 42 deletions background.go → server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ import (
"github.com/hibiken/asynq/internal/rdb"
)

// Background is responsible for managing the background-task processing.
// Server is responsible for managing the background-task processing.
//
// Background manages task queues to process tasks.
// If the processing of a task is unsuccessful, background will
// Server pulls tasks off queues and process them.
// If the processing of a task is unsuccessful, server will
// schedule it for a retry until either the task gets processed successfully
// or it exhausts its max retry count.
//
// Once a task exhausts its retries, it will be moved to the "dead" queue and
// will be kept in the queue for some time until a certain condition is met
// (e.g., queue size reaches a certain limit, or the task has been in the
// queue for a certain amount of time).
type Background struct {
type Server struct {
mu sync.Mutex
running bool

Expand All @@ -48,11 +48,11 @@ type Background struct {
subscriber *subscriber
}

// Config specifies the background-task processing behavior.
// Config specifies the server's background-task processing behavior.
type Config struct {
// Maximum number of concurrent processing of tasks.
//
// If set to a zero or negative value, NewBackground will overwrite the value to one.
// If set to a zero or negative value, NewServer will overwrite the value to one.
Concurrency int

// Function to calculate retry delay for a failed task.
Expand All @@ -67,7 +67,7 @@ type Config struct {
// List of queues to process with given priority value. Keys are the names of the
// queues and values are associated priority value.
//
// If set to nil or not specified, the background will process only the "default" queue.
// If set to nil or not specified, the server will process only the "default" queue.
//
// Priority is treated as follows to avoid starving low priority queues.
//
Expand Down Expand Up @@ -106,7 +106,7 @@ type Config struct {
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
ErrorHandler ErrorHandler

// Logger specifies the logger used by the background instance.
// Logger specifies the logger used by the server instance.
//
// If unset, default logger is used.
Logger Logger
Expand Down Expand Up @@ -156,9 +156,9 @@ var defaultQueueConfig = map[string]int{
base.DefaultQueueName: 1,
}

// NewBackground returns a new Background given a redis connection option
// NewServer returns a new Server given a redis connection option
// and background processing configuration.
func NewBackground(r RedisConnOpt, cfg *Config) *Background {
func NewServer(r RedisConnOpt, cfg Config) *Server {
n := cfg.Concurrency
if n < 1 {
n = 1
Expand Down Expand Up @@ -196,7 +196,7 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
scheduler := newScheduler(logger, rdb, 5*time.Second, queues)
processor := newProcessor(logger, rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler)
subscriber := newSubscriber(logger, rdb, cancels)
return &Background{
return &Server{
logger: logger,
rdb: rdb,
ps: ps,
Expand Down Expand Up @@ -234,64 +234,64 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error {
// an os signal to exit the program is received. Once it receives
// a signal, it gracefully shuts down all pending workers and other
// goroutines to process the tasks.
func (bg *Background) Run(handler Handler) {
func (srv *Server) Run(handler Handler) {
type prefixLogger interface {
SetPrefix(prefix string)
}
// If logger supports setting prefix, then set prefix for log output.
if l, ok := bg.logger.(prefixLogger); ok {
if l, ok := srv.logger.(prefixLogger); ok {
l.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid()))
}
bg.logger.Info("Starting processing")
srv.logger.Info("Starting processing")

bg.start(handler)
defer bg.stop()
srv.start(handler)
defer srv.stop()

bg.waitForSignals()
srv.waitForSignals()
fmt.Println()
bg.logger.Info("Starting graceful shutdown")
srv.logger.Info("Starting graceful shutdown")
}

// starts the background-task processing.
func (bg *Background) start(handler Handler) {
bg.mu.Lock()
defer bg.mu.Unlock()
if bg.running {
func (srv *Server) start(handler Handler) {
srv.mu.Lock()
defer srv.mu.Unlock()
if srv.running {
return
}

bg.running = true
bg.processor.handler = handler
srv.running = true
srv.processor.handler = handler

bg.heartbeater.start(&bg.wg)
bg.subscriber.start(&bg.wg)
bg.syncer.start(&bg.wg)
bg.scheduler.start(&bg.wg)
bg.processor.start(&bg.wg)
srv.heartbeater.start(&srv.wg)
srv.subscriber.start(&srv.wg)
srv.syncer.start(&srv.wg)
srv.scheduler.start(&srv.wg)
srv.processor.start(&srv.wg)
}

// stops the background-task processing.
func (bg *Background) stop() {
bg.mu.Lock()
defer bg.mu.Unlock()
if !bg.running {
func (srv *Server) stop() {
srv.mu.Lock()
defer srv.mu.Unlock()
if !srv.running {
return
}

// Note: The order of termination is important.
// Sender goroutines should be terminated before the receiver goroutines.
//
// processor -> syncer (via syncCh)
bg.scheduler.terminate()
bg.processor.terminate()
bg.syncer.terminate()
bg.subscriber.terminate()
bg.heartbeater.terminate()
srv.scheduler.terminate()
srv.processor.terminate()
srv.syncer.terminate()
srv.subscriber.terminate()
srv.heartbeater.terminate()

bg.wg.Wait()
srv.wg.Wait()

bg.rdb.Close()
bg.running = false
srv.rdb.Close()
srv.running = false

bg.logger.Info("Bye!")
srv.logger.Info("Bye!")
}
14 changes: 7 additions & 7 deletions background_test.go → server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"go.uber.org/goleak"
)

func TestBackground(t *testing.T) {
func TestServer(t *testing.T) {
// https://github.com/go-redis/redis/issues/1029
ignoreOpt := goleak.IgnoreTopFunction("github.com/go-redis/redis/v7/internal/pool.(*ConnPool).reaper")
defer goleak.VerifyNoLeaks(t, ignoreOpt)
Expand All @@ -22,8 +22,8 @@ func TestBackground(t *testing.T) {
Addr: "localhost:6379",
DB: 15,
}
client := NewClient(r)
bg := NewBackground(r, &Config{
c := NewClient(r)
srv := NewServer(r, Config{
Concurrency: 10,
})

Expand All @@ -32,19 +32,19 @@ func TestBackground(t *testing.T) {
return nil
}

bg.start(HandlerFunc(h))
srv.start(HandlerFunc(h))

err := client.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123}))
err := c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123}))
if err != nil {
t.Errorf("could not enqueue a task: %v", err)
}

err = client.EnqueueAt(time.Now().Add(time.Hour), NewTask("send_email", map[string]interface{}{"recipient_id": 456}))
err = c.EnqueueAt(time.Now().Add(time.Hour), NewTask("send_email", map[string]interface{}{"recipient_id": 456}))
if err != nil {
t.Errorf("could not enqueue a task: %v", err)
}

bg.stop()
srv.stop()
}

func TestGCD(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions signals_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@ import (
// It handles SIGTERM, SIGINT, and SIGTSTP.
// SIGTERM and SIGINT will signal the process to exit.
// SIGTSTP will signal the process to stop processing new tasks.
func (bg *Background) waitForSignals() {
bg.logger.Info("Send signal TSTP to stop processing new tasks")
bg.logger.Info("Send signal TERM or INT to terminate the process")
func (srv *Server) waitForSignals() {
srv.logger.Info("Send signal TSTP to stop processing new tasks")
srv.logger.Info("Send signal TERM or INT to terminate the process")

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
for {
sig := <-sigs
if sig == unix.SIGTSTP {
bg.processor.stop()
bg.ps.SetStatus(base.StatusStopped)
srv.processor.stop()
srv.ps.SetStatus(base.StatusStopped)
continue
}
break
Expand Down
4 changes: 2 additions & 2 deletions signals_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
// SIGTERM and SIGINT will signal the process to exit.
//
// Note: Currently SIGTSTP is not supported for windows build.
func (bg *Background) waitForSignals() {
bg.logger.Info("Send signal TERM or INT to terminate the process")
func (srv *Server) waitForSignals() {
srv.logger.Info("Send signal TERM or INT to terminate the process")
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, windows.SIGTERM, windows.SIGINT)
<-sigs
Expand Down

0 comments on commit f9842ba

Please sign in to comment.