Skip to content

Commit

Permalink
Add ListScheduelerEnqueueEvents to Inspector
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken committed Jan 14, 2021
1 parent c06e9de commit f4dd8fe
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- `CancelActiveTask` method is added to `Inspector`.
- `ListSchedulerEnqueueEvents` method is added to `Inspector`.
- `SchedulerEntries` method is added to `Inspector`.
- `DeleteQueue` method is added to `Inspector`.

Expand Down
26 changes: 26 additions & 0 deletions inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,3 +701,29 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) {
}
return entries, nil
}

// SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.
type SchedulerEnqueueEvent struct {
// ID of the task that was enqueued.
TaskID string

// Time the task was enqueued.
EnqueuedAt time.Time
}

// ListSchedulerEnqueueEvents retrieves a list of enqueue events from the specified scheduler entry.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListSchedulerEnqueueEvents(entryID string, opts ...ListOption) ([]*SchedulerEnqueueEvent, error) {
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
data, err := i.rdb.ListSchedulerEnqueueEvents(entryID, pgn)
if err != nil {
return nil, err
}
var events []*SchedulerEnqueueEvent
for _, e := range data {
events = append(events, &SchedulerEnqueueEvent{TaskID: e.TaskID, EnqueuedAt: e.EnqueuedAt})
}
return events, nil
}
4 changes: 2 additions & 2 deletions internal/rdb/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,9 +853,9 @@ func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) {
}

// ListSchedulerEnqueueEvents returns the list of scheduler enqueue events.
func (r *RDB) ListSchedulerEnqueueEvents(entryID string) ([]*base.SchedulerEnqueueEvent, error) {
func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*base.SchedulerEnqueueEvent, error) {
key := base.SchedulerHistoryKey(entryID)
zs, err := r.client.ZRangeWithScores(key, 0, -1).Result()
zs, err := r.client.ZRangeWithScores(key, pgn.start(), pgn.stop()).Result()
if err != nil {
return nil, err
}
Expand Down

0 comments on commit f4dd8fe

Please sign in to comment.