Skip to content

Commit

Permalink
Move inspeq package content to asynq package
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken committed Jun 29, 2021
1 parent 0ec3b55 commit 12f4c7c
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 195 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `Inspector.RunTaskByKey` is replaced with `Inspector.RunTask`
- `Inspector.DeleteTaskByKey` is replaced with `Inspector.DeleteTask`
- `Inspector.ArchiveTaskByKey` is replaced with `Inspector.ArchiveTask`
- `inspeq` package is removed. All types and functions from the package is moved to `asynq` package.

## [0.17.2] - 2021-06-06

Expand Down
52 changes: 25 additions & 27 deletions inspeq/inspector.go → inspector.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.

package inspeq
package asynq

import (
"fmt"
Expand All @@ -12,7 +12,6 @@ import (

"github.com/go-redis/redis/v7"
"github.com/google/uuid"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb"
Expand All @@ -25,7 +24,7 @@ type Inspector struct {
}

// New returns a new instance of Inspector.
func New(r asynq.RedisConnOpt) *Inspector {
func NewInspector(r RedisConnOpt) *Inspector {
c, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r))
Expand Down Expand Up @@ -170,7 +169,7 @@ func (i *Inspector) DeleteQueue(qname string, force bool) error {

// PendingTask is a task in a queue and is ready to be processed.
type PendingTask struct {
*asynq.Task
*Task
ID string
Queue string
MaxRetry int
Expand All @@ -180,7 +179,7 @@ type PendingTask struct {

// ActiveTask is a task that's currently being processed.
type ActiveTask struct {
*asynq.Task
*Task
ID string
Queue string
MaxRetry int
Expand All @@ -190,7 +189,7 @@ type ActiveTask struct {

// ScheduledTask is a task scheduled to be processed in the future.
type ScheduledTask struct {
*asynq.Task
*Task
ID string
Queue string
MaxRetry int
Expand All @@ -203,7 +202,7 @@ type ScheduledTask struct {

// RetryTask is a task scheduled to be retried in the future.
type RetryTask struct {
*asynq.Task
*Task
ID string
Queue string
NextProcessAt time.Time
Expand All @@ -220,7 +219,7 @@ type RetryTask struct {
// A task can be archived when the task exhausts its retry counts or manually
// archived by a user via the CLI or Inspector.
type ArchivedTask struct {
*asynq.Task
*Task
ID string
Queue string
MaxRetry int
Expand Down Expand Up @@ -366,7 +365,7 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi
var tasks []*PendingTask
for _, m := range msgs {
tasks = append(tasks, &PendingTask{
Task: asynq.NewTask(m.Type, m.Payload),
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
MaxRetry: m.Retry,
Expand All @@ -392,9 +391,8 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*Active
}
var tasks []*ActiveTask
for _, m := range msgs {

tasks = append(tasks, &ActiveTask{
Task: asynq.NewTask(m.Type, m.Payload),
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
MaxRetry: m.Retry,
Expand Down Expand Up @@ -422,7 +420,7 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Sch
var tasks []*ScheduledTask
for _, z := range zs {
processAt := time.Unix(z.Score, 0)
t := asynq.NewTask(z.Message.Type, z.Message.Payload)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &ScheduledTask{
Task: t,
ID: z.Message.ID.String(),
Expand Down Expand Up @@ -454,7 +452,7 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTa
var tasks []*RetryTask
for _, z := range zs {
processAt := time.Unix(z.Score, 0)
t := asynq.NewTask(z.Message.Type, z.Message.Payload)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &RetryTask{
Task: t,
ID: z.Message.ID.String(),
Expand Down Expand Up @@ -487,7 +485,7 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Arch
var tasks []*ArchivedTask
for _, z := range zs {
failedAt := time.Unix(z.Score, 0)
t := asynq.NewTask(z.Message.Type, z.Message.Payload)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &ArchivedTask{
Task: t,
ID: z.Message.ID.String(),
Expand Down Expand Up @@ -743,7 +741,7 @@ func (i *Inspector) Servers() ([]*ServerInfo, error) {
Started: w.Started,
Deadline: w.Deadline,
Task: &ActiveTask{
Task: asynq.NewTask(w.Type, w.Payload),
Task: NewTask(w.Type, w.Payload),
ID: w.ID,
Queue: w.Queue,
},
Expand Down Expand Up @@ -827,10 +825,10 @@ type SchedulerEntry struct {
Spec string

// Periodic Task registered for this entry.
Task *asynq.Task
Task *Task

// Opts is the options for the periodic task.
Opts []asynq.Option
Opts []Option

// Next shows the next time the task will be enqueued.
Next time.Time
Expand All @@ -849,8 +847,8 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) {
return nil, err
}
for _, e := range res {
task := asynq.NewTask(e.Type, e.Payload)
var opts []asynq.Option
task := NewTask(e.Type, e.Payload)
var opts []Option
for _, s := range e.Opts {
if o, err := parseOption(s); err == nil {
// ignore bad data
Expand All @@ -871,51 +869,51 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) {

// parseOption interprets a string s as an Option and returns the Option if parsing is successful,
// otherwise returns non-nil error.
func parseOption(s string) (asynq.Option, error) {
func parseOption(s string) (Option, error) {
fn, arg := parseOptionFunc(s), parseOptionArg(s)
switch fn {
case "Queue":
qname, err := strconv.Unquote(arg)
if err != nil {
return nil, err
}
return asynq.Queue(qname), nil
return Queue(qname), nil
case "MaxRetry":
n, err := strconv.Atoi(arg)
if err != nil {
return nil, err
}
return asynq.MaxRetry(n), nil
return MaxRetry(n), nil
case "Timeout":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return asynq.Timeout(d), nil
return Timeout(d), nil
case "Deadline":
t, err := time.Parse(time.UnixDate, arg)
if err != nil {
return nil, err
}
return asynq.Deadline(t), nil
return Deadline(t), nil
case "Unique":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return asynq.Unique(d), nil
return Unique(d), nil
case "ProcessAt":
t, err := time.Parse(time.UnixDate, arg)
if err != nil {
return nil, err
}
return asynq.ProcessAt(t), nil
return ProcessAt(t), nil
case "ProcessIn":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return asynq.ProcessIn(d), nil
return ProcessIn(d), nil
default:
return nil, fmt.Errorf("cannot not parse option string %q", s)
}
Expand Down
Loading

0 comments on commit 12f4c7c

Please sign in to comment.