-
Notifications
You must be signed in to change notification settings - Fork 208
Expand file tree
/
Copy pathmessageQueue.go
More file actions
115 lines (90 loc) · 2.38 KB
/
messageQueue.go
File metadata and controls
115 lines (90 loc) · 2.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package queue
import (
"container/heap"
"context"
"fmt"
"sync"
"time"
"github.com/onflow/flow-go/module"
)
type Priority int
const LowPriority = Priority(1)
const MediumPriority = Priority(5)
const HighPriority = Priority(10)
// MessagePriorityFunc - the callback function to derive priority of a message
type MessagePriorityFunc func(message interface{}) (Priority, error)
// MessageQueue is the heap based priority queue implementation of the MessageQueue implementation
type MessageQueue struct {
pq *priorityQueue
cond *sync.Cond
priorityFunc MessagePriorityFunc
ctx context.Context
metrics module.NetworkMetrics
}
func (mq *MessageQueue) Insert(message interface{}) error {
if err := mq.ctx.Err(); err != nil {
return err
}
// determine the message priority
priority, err := mq.priorityFunc(message)
if err != nil {
return fmt.Errorf("failed to dervie message priority: %w", err)
}
// create the queue item
item := &item{
message: message,
priority: int(priority),
timestamp: time.Now(),
}
// lock the underlying mutex
mq.cond.L.Lock()
// push message to the underlying priority queue
heap.Push(mq.pq, item)
// record metrics
mq.metrics.MessageAdded(item.priority)
// signal a waiting routine that a message is now available
mq.cond.Signal()
// unlock the underlying mutex
mq.cond.L.Unlock()
return nil
}
func (mq *MessageQueue) Remove() interface{} {
mq.cond.L.Lock()
defer mq.cond.L.Unlock()
for mq.pq.Len() == 0 {
// if the context has been canceled, don't wait
if err := mq.ctx.Err(); err != nil {
return nil
}
mq.cond.Wait()
}
item := heap.Pop(mq.pq).(*item)
// record metrics
mq.metrics.QueueDuration(time.Since(item.timestamp), item.priority)
mq.metrics.MessageRemoved(item.priority)
return item.message
}
func (mq *MessageQueue) Len() int {
mq.cond.L.Lock()
defer mq.cond.L.Unlock()
return mq.pq.Len()
}
func NewMessageQueue(ctx context.Context, priorityFunc MessagePriorityFunc, nm module.NetworkMetrics) *MessageQueue {
var items = make([]*item, 0)
pq := priorityQueue(items)
mq := &MessageQueue{
pq: &pq,
priorityFunc: priorityFunc,
ctx: ctx,
metrics: nm,
}
m := sync.Mutex{}
mq.cond = sync.NewCond(&m)
// kick off a go routine to unblock queue readers on shutdown
go func() {
<-ctx.Done()
// unblock receive
mq.cond.Broadcast()
}()
return mq
}