Skip to content

Commit

Permalink
Add RDB.GroupStats for inspecting groups
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken committed Apr 11, 2022
1 parent 45ed560 commit 0149396
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 0 deletions.
50 changes: 50 additions & 0 deletions internal/rdb/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,56 @@ func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error) {
}, nil
}

type GroupStat struct {
// Name of the group.
Group string

// Size of the group.
Size int
}

// KEYS[1] -> asynq:{<qname>}:groups
// -------
// ARGV[1] -> group key prefix
//
// Output:
// list of group name and size (e.g. group1 size1 group2 size2 ...)
//
// Time Complexity:
// O(N) where N being the number of groups in the given queue.
var groupStatsCmd = redis.NewScript(`
local res = {}
local group_names = redis.call("SMEMBERS", KEYS[1])
for _, gname in ipairs(group_names) do
local size = redis.call("ZCARD", ARGV[1] .. gname)
table.insert(res, gname)
table.insert(res, size)
end
return res
`)

func (r *RDB) GroupStats(qname string) ([]*GroupStat, error) {
var op errors.Op = "RDB.GroupStats"
keys := []string{base.AllGroups(qname)}
argv := []interface{}{base.GroupKeyPrefix(qname)}
res, err := groupStatsCmd.Run(context.Background(), r.client, keys, argv...).Result()
if err != nil {
return nil, errors.E(op, errors.Unknown, err)
}
data, err := cast.ToSliceE(res)
if err != nil {
return nil, errors.E(op, errors.Internal, "cast error: unexpected return value from Lua script")
}
var stats []*GroupStat
for i := 0; i < len(data); i += 2 {
stats = append(stats, &GroupStat{
Group: data[i].(string),
Size: int(data[i+1].(int64)),
})
}
return stats, nil
}

// Pagination specifies the page size and page number
// for the list operation.
type Pagination struct {
Expand Down
97 changes: 97 additions & 0 deletions internal/rdb/inspect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"encoding/json"
"fmt"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -419,6 +420,102 @@ func TestRedisInfo(t *testing.T) {
}
}

func TestGroupStats(t *testing.T) {
r := setup(t)
defer r.Close()

m1 := h.NewTaskMessageBuilder().SetGroup("group1").Build()
m2 := h.NewTaskMessageBuilder().SetGroup("group1").Build()
m3 := h.NewTaskMessageBuilder().SetGroup("group1").Build()
m4 := h.NewTaskMessageBuilder().SetGroup("group2").Build()
m5 := h.NewTaskMessageBuilder().SetQueue("custom").SetGroup("group1").Build()
m6 := h.NewTaskMessageBuilder().SetQueue("custom").SetGroup("group1").Build()

now := time.Now()

fixtures := struct {
tasks []*taskData
allGroups map[string][]string
groups map[string][]*redis.Z
}{
tasks: []*taskData{
{msg: m1, state: base.TaskStateAggregating},
{msg: m2, state: base.TaskStateAggregating},
{msg: m3, state: base.TaskStateAggregating},
{msg: m4, state: base.TaskStateAggregating},
{msg: m5, state: base.TaskStateAggregating},
},
allGroups: map[string][]string{
base.AllGroups("default"): {"group1", "group2"},
base.AllGroups("custom"): {"group1"},
},
groups: map[string][]*redis.Z{
base.GroupKey("default", "group1"): {
{Member: m1.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
{Member: m2.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
{Member: m3.ID, Score: float64(now.Add(-30 * time.Second).Unix())},
},
base.GroupKey("default", "group2"): {
{Member: m4.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
},
base.GroupKey("custom", "group1"): {
{Member: m5.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
{Member: m6.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
},
},
}

tests := []struct {
desc string
qname string
want []*GroupStat
}{
{
desc: "default queue groups",
qname: "default",
want: []*GroupStat{
{Group: "group1", Size: 3},
{Group: "group2", Size: 1},
},
},
{
desc: "custom queue groups",
qname: "custom",
want: []*GroupStat{
{Group: "group1", Size: 2},
},
},
}

var sortGroupStatsOpt = cmp.Transformer(
"SortGroupStats",
func(in []*GroupStat) []*GroupStat {
out := append([]*GroupStat(nil), in...)
sort.Slice(out, func(i, j int) bool {
return out[i].Group < out[j].Group
})
return out
})

for _, tc := range tests {
h.FlushDB(t, r.client)
SeedTasks(t, r.client, fixtures.tasks)
SeedSets(t, r.client, fixtures.allGroups)
SeedZSets(t, r.client, fixtures.groups)

t.Run(tc.desc, func(t *testing.T) {
got, err := r.GroupStats(tc.qname)
if err != nil {
t.Fatalf("GroupStats returned error: %v", err)
}
if diff := cmp.Diff(tc.want, got, sortGroupStatsOpt); diff != "" {
t.Errorf("GroupStats = %v, want %v; (-want,+got)\n%s", got, tc.want, diff)
}
})
}

}

func TestGetTaskInfo(t *testing.T) {
r := setup(t)
defer r.Close()
Expand Down

0 comments on commit 0149396

Please sign in to comment.