Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update BoundedMemoryQueue to consume remaining items on close #5203

Merged
merged 7 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 5 additions & 18 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type boundedMemoryQueue struct {
items chan interface{}
onDroppedItem func(item interface{})
factory func() consumer
stopCh chan struct{}
capacity uint32
}

Expand All @@ -44,7 +43,6 @@ func NewBoundedMemoryQueue(capacity int, onDroppedItem func(item interface{})) P
return &boundedMemoryQueue{
onDroppedItem: onDroppedItem,
items: make(chan interface{}, capacity),
stopCh: make(chan struct{}),
stopped: uatomic.NewUint32(0),
size: uatomic.NewUint32(0),
capacity: uint32(capacity),
Expand All @@ -66,21 +64,11 @@ func (q *boundedMemoryQueue) StartConsumers(numWorkers int, callback func(item i
startWG.Done()
defer q.stopWG.Done()
itemConsumer := q.factory()
for {
select {
case item, ok := <-q.items:
if ok {
q.size.Sub(1)
itemConsumer.consume(item)
} else {
// channel closed, finish worker
return
}
case <-q.stopCh:
// the whole queue is closing, finish worker
return
}
for item := range q.items {
q.size.Sub(1)
itemConsumer.consume(item)
}
return
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
}()
}
startWG.Wait()
Expand Down Expand Up @@ -129,9 +117,8 @@ func (q *boundedMemoryQueue) Produce(item interface{}) bool {
// and releases the items channel. It blocks until all consumers have stopped.
func (q *boundedMemoryQueue) Stop() {
q.stopped.Store(1) // disable producer
close(q.stopCh)
q.stopWG.Wait()
close(q.items)
q.stopWG.Wait()
}

// Size returns the current size of the queue
Expand Down
46 changes: 46 additions & 0 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,52 @@ func TestBoundedQueue(t *testing.T) {
})
}

// In this test we run a queue with many items and a slow consumer.
// When the queue is stopped, the remaining items should be processed.
// Due to the way q.Stop() waits for all consumers to finish, the
// same lock strategy use above will not work, as calling Unlock
// only after Stop will mean the consumers are still locked while
// trying to perform the final consumptions.
func TestShutdownWhileNotEmpty(t *testing.T) {
q := NewBoundedMemoryQueue(10, func(item interface{}) {})

consumerState := newConsumerState(t)

q.StartConsumers(1, func(item interface{}) {
consumerState.record(item.(string))
//
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
time.Sleep(1 * time.Second)
})

q.Produce("a")
q.Produce("b")
q.Produce("c")
q.Produce("d")
q.Produce("e")
q.Produce("f")
q.Produce("g")
q.Produce("h")
q.Produce("i")
q.Produce("j")

q.Stop()

assert.False(t, q.Produce("x"), "cannot push to closed queue")
consumerState.assertConsumed(map[string]bool{
"a": true,
"b": true,
"c": true,
"d": true,
"e": true,
"f": true,
"g": true,
"h": true,
"i": true,
"j": true,
})
assert.Equal(t, 0, q.Size())
}

type consumerState struct {
sync.Mutex
t *testing.T
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kB
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
Expand Down