Skip to content

Commit

Permalink
Add MemoryUsage field to QueueStats
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken committed Jan 31, 2021
1 parent 6529a1e commit afde6a7
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- `MemoryUsage` field is added to `QueueStats`.
- `DeleteAllPendingTasks`, `ArchiveAllPendingTasks` were added to `Inspector`
- `DeleteTaskByKey` and `ArchiveTaskByKey` now supports deleting/archiving `PendingTask`.
- asynq CLI now supports deleting/archiving pending tasks.
Expand Down
25 changes: 14 additions & 11 deletions inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func (i *Inspector) Queues() ([]string, error) {
type QueueStats struct {
// Name of the queue.
Queue string
// Total number of bytes that the queue and its tasks require to be stored in redis.
MemoryUsage int64
// Size is the total number of tasks in the queue.
// The value is the sum of Pending, Active, Scheduled, Retry, and Archived.
Size int
Expand Down Expand Up @@ -76,17 +78,18 @@ func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) {
return nil, err
}
return &QueueStats{
Queue: stats.Queue,
Size: stats.Size,
Pending: stats.Pending,
Active: stats.Active,
Scheduled: stats.Scheduled,
Retry: stats.Retry,
Archived: stats.Archived,
Processed: stats.Processed,
Failed: stats.Failed,
Paused: stats.Paused,
Timestamp: stats.Timestamp,
Queue: stats.Queue,
MemoryUsage: stats.MemoryUsage,
Size: stats.Size,
Pending: stats.Pending,
Active: stats.Active,
Scheduled: stats.Scheduled,
Retry: stats.Retry,
Archived: stats.Archived,
Processed: stats.Processed,
Failed: stats.Failed,
Paused: stats.Paused,
Timestamp: stats.Timestamp,
}, nil
}

Expand Down
3 changes: 2 additions & 1 deletion inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func TestInspectorCurrentStats(t *testing.T) {
m6 := h.NewTaskMessageWithQueue("task6", nil, "low")
now := time.Now()
timeCmpOpt := cmpopts.EquateApproxTime(time.Second)
ignoreMemUsg := cmpopts.IgnoreFields(QueueStats{}, "MemoryUsage")

inspector := NewInspector(getRedisConnOpt(t))

Expand Down Expand Up @@ -356,7 +357,7 @@ func TestInspectorCurrentStats(t *testing.T) {
tc.qname, got, err, tc.want)
continue
}
if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" {
if diff := cmp.Diff(tc.want, got, timeCmpOpt, ignoreMemUsg); diff != "" {
t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil; (-want, +got)\n%s",
tc.qname, got, err, tc.want, diff)
continue
Expand Down
24 changes: 24 additions & 0 deletions internal/rdb/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func (r *RDB) AllQueues() ([]string, error) {
type Stats struct {
// Name of the queue (e.g. "default", "critical").
Queue string
// MemoryUsage is the total number of bytes the queue and its tasks require
// to be stored in redis.
MemoryUsage int64
// Paused indicates whether the queue is paused.
// If true, tasks in the queue should not be processed.
Paused bool
Expand Down Expand Up @@ -160,9 +163,30 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
}
}
stats.Size = size
memusg, err := r.memoryUsage(qname)
if err != nil {
return nil, err
}
stats.MemoryUsage = memusg
return stats, nil
}

func (r *RDB) memoryUsage(qname string) (int64, error) {
keys, err := r.client.Keys(fmt.Sprintf("asynq:{%s}*", qname)).Result()
if err != nil {
return 0, err
}
var usg int64
for _, k := range keys {
n, err := r.client.MemoryUsage(k).Result()
if err != nil {
return 0, err
}
usg += n
}
return usg, nil
}

var historicalStatsCmd = redis.NewScript(`
local res = {}
for _, key in ipairs(KEYS) do
Expand Down
3 changes: 2 additions & 1 deletion internal/rdb/inspect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ func TestCurrentStats(t *testing.T) {
continue
}

if diff := cmp.Diff(tc.want, got, timeCmpOpt); diff != "" {
ignoreMemUsg := cmpopts.IgnoreFields(Stats{}, "MemoryUsage")
if diff := cmp.Diff(tc.want, got, timeCmpOpt, ignoreMemUsg); diff != "" {
t.Errorf("r.CurrentStats(%q) = %v, %v, want %v, nil; (-want, +got)\n%s", tc.qname, got, err, tc.want, diff)
continue
}
Expand Down

0 comments on commit afde6a7

Please sign in to comment.