Skip to content
Ken Hibino edited this page Mar 22, 2020 · 4 revisions

The unique tasks feature in Asynq make it simple to ensure that you have only one copy of a task enqueued in Redis.
This feature is useful when you want to deduplicate tasks to ensure that you are not creating redundant work.

Overview

Asynq's unique task feature is based on uniqueness locks. When enqueueing a task with Unique option, Client checks whether if it can acquire a lock for the given task. The task is enqueued only if the lock can be acquired. If there's already another task holding the lock, then the Client will return an error (See example code below on how to inspect the error).

The uniqueness lock has a TTL associated with it to avoid holding the lock forever. The lock will be released after the TTL or if the task holding the lock gets processed successfully before the TTL.
One important thing to note is that the Asynq's unique task feature is best-effort uniqueness. In other words, it's possible to enqueue a duplicate task if the lock has expired before the task gets processed.

The uniqueness of a task is based on the following properties:

  • Type
  • Payload
  • Queue

So if there's a task with the same type and payload, and if it's enqueued to the same queue, then another task with these same properties won't be enqueued until the lock has been released.

Example

c := asynq.NewClient(redis)

t1 := asynq.NewTask("example", map[string]interface{}{"a": 123})

// t1 will hold the uniqueness lock for the next hour.
err := c.Enqueue(t1, asynq.Unique(time.Hour))
if err != nil {
    if errors.Is(err, asynq.ErrDuplicateTask) {
        // handle duplicate task
    } else {
        // handle other errors
    }
}

t2 := asynq.NewTask("example", map[string]interface{}{"a": 123})
 
// t2 cannot be enqueued because it's a duplicate of t1.
err = c.Enqueue(t2, asynq.Unique(time.Hour))
if err != nil {
    if errors.Is(err, asynq.ErrDuplicateTask) {
        // handle duplicate task
    } else {
        // handle other errors
    }
}

In the above example, t2 won't be enqueued since t2 is a duplicate of t1.
Returned error value can be inspected using errors.Is to see if it wraps asynq.ErrDuplicateTask error.