Skip to content

Commit

Permalink
Add Retry and LastError fields to inspector tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken committed Jan 31, 2021
1 parent afde6a7 commit bfde0b6
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 24 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- `ErrorMsg` field in `RetryTask` and `ArchivedTask` was renamed to `LastError`.

### Added

- `MemoryUsage` field is added to `QueueStats`.
- `MaxRetry`, `Retried`, `LastError` fields were added to all task types returned from `Inspector`.
- `MemoryUsage` field was added to `QueueStats`.
- `DeleteAllPendingTasks`, `ArchiveAllPendingTasks` were added to `Inspector`
- `DeleteTaskByKey` and `ArchiveTaskByKey` now supports deleting/archiving `PendingTask`.
- asynq CLI now supports deleting/archiving pending tasks.
Expand Down
49 changes: 34 additions & 15 deletions inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,22 +169,31 @@ func (i *Inspector) DeleteQueue(qname string, force bool) error {
// PendingTask is a task in a queue and is ready to be processed.
type PendingTask struct {
*Task
ID string
Queue string
ID string
Queue string
MaxRetry int
Retried int
LastError string
}

// ActiveTask is a task that's currently being processed.
type ActiveTask struct {
*Task
ID string
Queue string
ID string
Queue string
MaxRetry int
Retried int
LastError string
}

// ScheduledTask is a task scheduled to be processed in the future.
type ScheduledTask struct {
*Task
ID string
Queue string
MaxRetry int
Retried int
LastError string
NextProcessAt time.Time

score int64
Expand All @@ -198,7 +207,7 @@ type RetryTask struct {
NextProcessAt time.Time
MaxRetry int
Retried int
ErrorMsg string
LastError string
// TODO: LastFailedAt time.Time

score int64
Expand All @@ -215,7 +224,7 @@ type ArchivedTask struct {
MaxRetry int
Retried int
LastFailedAt time.Time
ErrorMsg string
LastError string

score int64
}
Expand Down Expand Up @@ -355,9 +364,12 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi
var tasks []*PendingTask
for _, m := range msgs {
tasks = append(tasks, &PendingTask{
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
MaxRetry: m.Retry,
Retried: m.Retried,
LastError: m.ErrorMsg,
})
}
return tasks, err
Expand All @@ -378,10 +390,14 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*Active
}
var tasks []*ActiveTask
for _, m := range msgs {

tasks = append(tasks, &ActiveTask{
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
MaxRetry: m.Retry,
Retried: m.Retried,
LastError: m.ErrorMsg,
})
}
return tasks, err
Expand Down Expand Up @@ -409,6 +425,9 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Sch
Task: t,
ID: z.Message.ID.String(),
Queue: z.Message.Queue,
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
LastError: z.Message.ErrorMsg,
NextProcessAt: processAt,
score: z.Score,
})
Expand Down Expand Up @@ -442,8 +461,8 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTa
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
// TODO: LastFailedAt: z.Message.LastFailedAt
ErrorMsg: z.Message.ErrorMsg,
score: z.Score,
LastError: z.Message.ErrorMsg,
score: z.Score,
})
}
return tasks, nil
Expand Down Expand Up @@ -474,7 +493,7 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Arch
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
LastFailedAt: failedAt,
ErrorMsg: z.Message.ErrorMsg,
LastError: z.Message.ErrorMsg,
score: z.Score,
})
}
Expand Down
25 changes: 17 additions & 8 deletions inspector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,9 +423,12 @@ func TestInspectorHistory(t *testing.T) {

func createPendingTask(msg *base.TaskMessage) *PendingTask {
return &PendingTask{
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastError: msg.ErrorMsg,
}
}

Expand Down Expand Up @@ -510,9 +513,12 @@ func TestInspectorListActiveTasks(t *testing.T) {

createActiveTask := func(msg *base.TaskMessage) *ActiveTask {
return &ActiveTask{
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastError: msg.ErrorMsg,
}
}

Expand Down Expand Up @@ -559,6 +565,9 @@ func createScheduledTask(z base.Z) *ScheduledTask {
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastError: msg.ErrorMsg,
NextProcessAt: time.Unix(z.Score, 0),
score: z.Score,
}
Expand Down Expand Up @@ -635,7 +644,7 @@ func createRetryTask(z base.Z) *RetryTask {
NextProcessAt: time.Unix(z.Score, 0),
MaxRetry: msg.Retry,
Retried: msg.Retried,
ErrorMsg: msg.ErrorMsg,
LastError: msg.ErrorMsg,
score: z.Score,
}
}
Expand Down Expand Up @@ -712,7 +721,7 @@ func createArchivedTask(z base.Z) *ArchivedTask {
MaxRetry: msg.Retry,
Retried: msg.Retried,
LastFailedAt: time.Unix(z.Score, 0),
ErrorMsg: msg.ErrorMsg,
LastError: msg.ErrorMsg,
score: z.Score,
}
}
Expand Down

0 comments on commit bfde0b6

Please sign in to comment.