Skip to content

Commit

Permalink
Add Latency field to QueueInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken committed Dec 11, 2021
1 parent e7c1c3a commit 99a6750
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 50 deletions.
4 changes: 4 additions & 0 deletions inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type QueueInfo struct {
// It is an approximate memory usage value in bytes since the value is computed by sampling.
MemoryUsage int64

// Latency of the queue, measured by the oldest pending task in the queue.
Latency time.Duration

// Size is the total number of tasks in the queue.
// The value is the sum of Pending, Active, Scheduled, Retry, and Archived.
Size int
Expand Down Expand Up @@ -95,6 +98,7 @@ func (i *Inspector) GetQueueInfo(qname string) (*QueueInfo, error) {
return &QueueInfo{
Queue: stats.Queue,
MemoryUsage: stats.MemoryUsage,
Latency: stats.Latency,
Size: stats.Size,
Pending: stats.Pending,
Active: stats.Active,
Expand Down
41 changes: 29 additions & 12 deletions inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
"github.com/hibiken/asynq/internal/timeutil"
)

func TestInspectorQueues(t *testing.T) {
Expand Down Expand Up @@ -269,18 +270,20 @@ func TestInspectorGetQueueInfo(t *testing.T) {
ignoreMemUsg := cmpopts.IgnoreFields(QueueInfo{}, "MemoryUsage")

inspector := NewInspector(getRedisConnOpt(t))
inspector.rdb.SetClock(timeutil.NewSimulatedClock(now))

tests := []struct {
pending map[string][]*base.TaskMessage
active map[string][]*base.TaskMessage
scheduled map[string][]base.Z
retry map[string][]base.Z
archived map[string][]base.Z
completed map[string][]base.Z
processed map[string]int
failed map[string]int
qname string
want *QueueInfo
pending map[string][]*base.TaskMessage
active map[string][]*base.TaskMessage
scheduled map[string][]base.Z
retry map[string][]base.Z
archived map[string][]base.Z
completed map[string][]base.Z
processed map[string]int
failed map[string]int
oldestPendingMessageEnqueueTime map[string]time.Time
qname string
want *QueueInfo
}{
{
pending: map[string][]*base.TaskMessage{
Expand Down Expand Up @@ -326,9 +329,15 @@ func TestInspectorGetQueueInfo(t *testing.T) {
"critical": 0,
"low": 5,
},
oldestPendingMessageEnqueueTime: map[string]time.Time{
"default": now.Add(-15 * time.Second),
"critical": now.Add(-200 * time.Millisecond),
"low": now.Add(-30 * time.Second),
},
qname: "default",
want: &QueueInfo{
Queue: "default",
Latency: 15 * time.Second,
Size: 4,
Pending: 1,
Active: 1,
Expand All @@ -352,13 +361,21 @@ func TestInspectorGetQueueInfo(t *testing.T) {
h.SeedAllRetryQueues(t, r, tc.retry)
h.SeedAllArchivedQueues(t, r, tc.archived)
h.SeedAllCompletedQueues(t, r, tc.completed)
ctx := context.Background()
for qname, n := range tc.processed {
processedKey := base.ProcessedKey(qname, now)
r.Set(context.Background(), processedKey, n, 0)
r.Set(ctx, processedKey, n, 0)
}
for qname, n := range tc.failed {
failedKey := base.FailedKey(qname, now)
r.Set(context.Background(), failedKey, n, 0)
r.Set(ctx, failedKey, n, 0)
}
for qname, enqueueTime := range tc.oldestPendingMessageEnqueueTime {
if enqueueTime.IsZero() {
continue
}
oldestPendingMessageID := r.LRange(ctx, base.PendingKey(qname), -1, -1).Val()[0] // get the right most msg in the list
r.HSet(ctx, base.TaskKey(qname, oldestPendingMessageID), "pending_since", enqueueTime.UnixNano())
}

got, err := inspector.GetQueueInfo(tc.qname)
Expand Down
22 changes: 20 additions & 2 deletions internal/rdb/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type Stats struct {
Processed int
// Total number of tasks failed during the current date.
Failed int
// Latency of the queue, measured by the oldest pending task in the queue.
Latency time.Duration
// Time this stats was taken.
Timestamp time.Time
}
Expand All @@ -72,10 +74,13 @@ type DailyStats struct {
// KEYS[7] -> asynq:<qname>:processed:<yyyy-mm-dd>
// KEYS[8] -> asynq:<qname>:failed:<yyyy-mm-dd>
// KEYS[9] -> asynq:<qname>:paused
//
// ARGV[1] -> task key prefix
var currentStatsCmd = redis.NewScript(`
local res = {}
local pendingTaskCount = redis.call("LLEN", KEYS[1])
table.insert(res, KEYS[1])
table.insert(res, redis.call("LLEN", KEYS[1]))
table.insert(res, pendingTaskCount)
table.insert(res, KEYS[2])
table.insert(res, redis.call("LLEN", KEYS[2]))
table.insert(res, KEYS[3])
Expand All @@ -102,6 +107,13 @@ table.insert(res, KEYS[8])
table.insert(res, fcount)
table.insert(res, KEYS[9])
table.insert(res, redis.call("EXISTS", KEYS[9]))
table.insert(res, "oldest_pending_since")
if pendingTaskCount > 0 then
local id = redis.call("LRANGE", KEYS[1], -1, -1)[1]
table.insert(res, redis.call("HGET", ARGV[1] .. id, "pending_since"))
else
table.insert(res, 0)
end
return res`)

// CurrentStats returns a current state of the queues.
Expand All @@ -125,7 +137,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
base.ProcessedKey(qname, now),
base.FailedKey(qname, now),
base.PausedKey(qname),
}).Result()
}, base.TaskKeyPrefix(qname)).Result()
if err != nil {
return nil, errors.E(op, errors.Unknown, err)
}
Expand Down Expand Up @@ -170,6 +182,12 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
} else {
stats.Paused = true
}
case "oldest_pending_since":
if val == 0 {
stats.Latency = 0
} else {
stats.Latency = r.clock.Now().Sub(time.Unix(0, int64(val)))
}
}
}
stats.Size = size
Expand Down
61 changes: 42 additions & 19 deletions internal/rdb/inspect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/timeutil"
)

func TestAllQueues(t *testing.T) {
Expand Down Expand Up @@ -60,27 +61,29 @@ func TestCurrentStats(t *testing.T) {
m5 := h.NewTaskMessageWithQueue("important_notification", nil, "critical")
m6 := h.NewTaskMessageWithQueue("minor_notification", nil, "low")
now := time.Now()
r.SetClock(timeutil.NewSimulatedClock(now))

tests := []struct {
pending map[string][]*base.TaskMessage
inProgress map[string][]*base.TaskMessage
scheduled map[string][]base.Z
retry map[string][]base.Z
archived map[string][]base.Z
completed map[string][]base.Z
processed map[string]int
failed map[string]int
paused []string
qname string
want *Stats
pending map[string][]*base.TaskMessage
active map[string][]*base.TaskMessage
scheduled map[string][]base.Z
retry map[string][]base.Z
archived map[string][]base.Z
completed map[string][]base.Z
processed map[string]int
failed map[string]int
paused []string
oldestPendingMessageEnqueueTime map[string]time.Time
qname string
want *Stats
}{
{
pending: map[string][]*base.TaskMessage{
"default": {m1},
"critical": {m5},
"low": {m6},
},
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {m2},
"critical": {},
"low": {},
Expand Down Expand Up @@ -118,6 +121,11 @@ func TestCurrentStats(t *testing.T) {
"critical": 0,
"low": 1,
},
oldestPendingMessageEnqueueTime: map[string]time.Time{
"default": now.Add(-15 * time.Second),
"critical": now.Add(-200 * time.Millisecond),
"low": now.Add(-30 * time.Second),
},
paused: []string{},
qname: "default",
want: &Stats{
Expand All @@ -132,16 +140,17 @@ func TestCurrentStats(t *testing.T) {
Completed: 0,
Processed: 120,
Failed: 2,
Latency: 15 * time.Second,
Timestamp: now,
},
},
{
pending: map[string][]*base.TaskMessage{
"default": {m1},
"critical": {m5},
"critical": {},
"low": {m6},
},
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {m2},
"critical": {},
"low": {},
Expand Down Expand Up @@ -179,20 +188,26 @@ func TestCurrentStats(t *testing.T) {
"critical": 0,
"low": 1,
},
oldestPendingMessageEnqueueTime: map[string]time.Time{
"default": now.Add(-15 * time.Second),
"critical": time.Time{}, // zero value since there's no pending task in this queue
"low": now.Add(-30 * time.Second),
},
paused: []string{"critical", "low"},
qname: "critical",
want: &Stats{
Queue: "critical",
Paused: true,
Size: 1,
Pending: 1,
Size: 0,
Pending: 0,
Active: 0,
Scheduled: 0,
Retry: 0,
Archived: 0,
Completed: 0,
Processed: 100,
Failed: 0,
Latency: 0,
Timestamp: now,
},
},
Expand All @@ -206,18 +221,26 @@ func TestCurrentStats(t *testing.T) {
}
}
h.SeedAllPendingQueues(t, r.client, tc.pending)
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
h.SeedAllActiveQueues(t, r.client, tc.active)
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllRetryQueues(t, r.client, tc.retry)
h.SeedAllArchivedQueues(t, r.client, tc.archived)
h.SeedAllCompletedQueues(t, r.client, tc.completed)
ctx := context.Background()
for qname, n := range tc.processed {
processedKey := base.ProcessedKey(qname, now)
r.client.Set(context.Background(), processedKey, n, 0)
r.client.Set(ctx, processedKey, n, 0)
}
for qname, n := range tc.failed {
failedKey := base.FailedKey(qname, now)
r.client.Set(context.Background(), failedKey, n, 0)
r.client.Set(ctx, failedKey, n, 0)
}
for qname, enqueueTime := range tc.oldestPendingMessageEnqueueTime {
if enqueueTime.IsZero() {
continue
}
oldestPendingMessageID := r.client.LRange(ctx, base.PendingKey(qname), -1, -1).Val()[0] // get the right most msg in the list
r.client.HSet(ctx, base.TaskKey(qname, oldestPendingMessageID), "pending_since", enqueueTime.UnixNano())
}

got, err := r.CurrentStats(tc.qname)
Expand Down
12 changes: 6 additions & 6 deletions internal/rdb/rdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (r *RDB) runScriptWithErrorCode(ctx context.Context, op errors.Op, script *
// ARGV[2] -> task ID
// ARGV[3] -> task timeout in seconds (0 if not timeout)
// ARGV[4] -> task deadline in unix time (0 if no deadline)
// ARGV[5] -> current uinx time in millisecond
// ARGV[5] -> current unix time in nsec
//
// Output:
// Returns 1 if successfully enqueued
Expand Down Expand Up @@ -123,7 +123,7 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
msg.ID,
msg.Timeout,
msg.Deadline,
timeutil.UnixMilli(r.clock.Now()),
r.clock.Now().UnixNano(),
}
n, err := r.runScriptWithErrorCode(ctx, op, enqueueCmd, keys, argv...)
if err != nil {
Expand All @@ -146,7 +146,7 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
// ARGV[3] -> task message data
// ARGV[4] -> task timeout in seconds (0 if not timeout)
// ARGV[5] -> task deadline in unix time (0 if no deadline)
// ARGV[6] -> current unix time in milliseconds
// ARGV[6] -> current unix time in nsec
//
// Output:
// Returns 1 if successfully enqueued
Expand Down Expand Up @@ -193,7 +193,7 @@ func (r *RDB) EnqueueUnique(ctx context.Context, msg *base.TaskMessage, ttl time
encoded,
msg.Timeout,
msg.Deadline,
timeutil.UnixMilli(r.clock.Now()),
r.clock.Now().UnixNano(),
}
n, err := r.runScriptWithErrorCode(ctx, op, enqueueUniqueCmd, keys, argv...)
if err != nil {
Expand Down Expand Up @@ -774,7 +774,7 @@ func (r *RDB) ForwardIfReady(qnames ...string) error {
// KEYS[2] -> asynq:{<qname>}:pending
// ARGV[1] -> current unix time in seconds
// ARGV[2] -> task key prefix
// ARGV[3] -> current unix time in milliseconds
// ARGV[3] -> current unix time in nsec
// Note: Script moves tasks up to 100 at a time to keep the runtime of script short.
var forwardCmd = redis.NewScript(`
local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100)
Expand All @@ -792,7 +792,7 @@ return table.getn(ids)`)
func (r *RDB) forward(src, dst, taskKeyPrefix string) (int, error) {
now := r.clock.Now()
res, err := forwardCmd.Run(context.Background(), r.client,
[]string{src, dst}, now.Unix(), taskKeyPrefix, timeutil.UnixMilli(now)).Result()
[]string{src, dst}, now.Unix(), taskKeyPrefix, now.UnixNano()).Result()
if err != nil {
return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err))
}
Expand Down
6 changes: 3 additions & 3 deletions internal/rdb/rdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestEnqueue(t *testing.T) {
t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want)
}
pendingSince := r.client.HGet(context.Background(), taskKey, "pending_since").Val() // "pending_since" field
if want := strconv.Itoa(int(timeutil.UnixMilli(enqueueTime))); pendingSince != want {
if want := strconv.Itoa(int(enqueueTime.UnixNano())); pendingSince != want {
t.Errorf("pending_since field under task-key is set to %v, want %v", pendingSince, want)
}

Expand Down Expand Up @@ -242,7 +242,7 @@ func TestEnqueueUnique(t *testing.T) {
t.Errorf("deadline field under task-key is set to %v, want %v", deadline, want)
}
pendingSince := r.client.HGet(context.Background(), taskKey, "pending_since").Val() // "pending_since" field
if want := strconv.Itoa(int(timeutil.UnixMilli(enqueueTime))); pendingSince != want {
if want := strconv.Itoa(int(enqueueTime.UnixNano())); pendingSince != want {
t.Errorf("pending_since field under task-key is set to %v, want %v", pendingSince, want)
}
uniqueKey := r.client.HGet(context.Background(), taskKey, "unique_key").Val() // "unique_key" field
Expand Down Expand Up @@ -2087,7 +2087,7 @@ func TestForwardIfReady(t *testing.T) {
// Make sure "pending_since" field is set
for _, msg := range gotPending {
pendingSince := r.client.HGet(context.Background(), base.TaskKey(msg.Queue, msg.ID), "pending_since").Val()
if want := strconv.Itoa(int(timeutil.UnixMilli(now))); pendingSince != want {
if want := strconv.Itoa(int(now.UnixNano())); pendingSince != want {
t.Error("pending_since field is not set for newly pending message")
}
}
Expand Down
8 changes: 0 additions & 8 deletions internal/timeutil/timeutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,3 @@ func (c *SimulatedClock) Now() time.Time { return c.t }
func (c *SimulatedClock) SetTime(t time.Time) { c.t = t }

func (c *SimulatedClock) AdvanceTime(d time.Duration) { c.t.Add(d) }

// UnixMilli returns t as a Unix time, the number of milliseconds elapsed since
// January 1, 1970 UTC.
//
// TODO: Use time.UnixMilli() when we drop support for go1.16 or below
func UnixMilli(t time.Time) int64 {
return t.UnixNano() / 1e6
}

0 comments on commit 99a6750

Please sign in to comment.