Skip to content

Commit

Permalink
Always enqueue the aggregated task in the same queue
Browse files Browse the repository at this point in the history
  • Loading branch information
hibiken committed Apr 11, 2022
1 parent 829f64f commit 39718f8
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
2 changes: 1 addition & 1 deletion aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (a *aggregator) aggregate(t time.Time) {
}
aggregatedTask := a.ga.Aggregate(gname, tasks)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
if _, err := a.client.EnqueueContext(ctx, aggregatedTask); err != nil {
if _, err := a.client.EnqueueContext(ctx, aggregatedTask, Queue(qname)); err != nil {
a.logger.Errorf("Failed to enqueue aggregated task (queue=%q, group=%q, setID=%q): %v",
qname, gname, aggregationSetID, err)
cancel()
Expand Down
6 changes: 5 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,12 @@ type Config struct {

// GroupAggregator aggregates a group of tasks into one before the tasks are passed to the Handler.
type GroupAggregator interface {
// Aggregate aggregates the given tasks which belong to a same group
// Aggregate aggregates the given tasks which belong to a same group with the given groupKey
// and returns a new task which is the aggregation of those tasks.
//
// Use NewTask(typename, payload, opts...) to set any options for the aggregated task.
// Queue option will be ignored and the aggregated task will always be enqueued to the same queue
// the group belonged.
Aggregate(groupKey string, tasks []*Task) *Task
}

Expand Down

0 comments on commit 39718f8

Please sign in to comment.