forked from wagslane/go-rabbitmq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
consume_options.go
108 lines (99 loc) · 3.67 KB
/
consume_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package rabbitmq
// getDefaultConsumeOptions describes the options that will be used when a value isn't provided
func getDefaultConsumeOptions() ConsumeOptions {
return ConsumeOptions{
QueueDeclare: QueueDeclareOptions{
QueueName: "",
QueueDurable: false,
QueueAutoDelete: false,
QueueExclusive: false,
QueueNoWait: false,
QueueArgs: nil,
},
BindingExchange: BindingExchangeOptions{
Name: "",
Kind: "",
Durable: false,
AutoDelete: false,
Internal: false,
NoWait: false,
BindingNoWait: false,
BindingArgs: nil,
ExchangeArgs: nil,
},
Qos: QosOptions{
QOSPrefetchCount: 0,
QOSPrefetchSize: 0,
QOSGlobal: false,
},
Concurrency: 1,
ConsumerName: "",
ConsumerAutoAck: false,
ConsumerExclusive: false,
ConsumerNoWait: false,
ConsumerNoLocal: false,
ConsumerArgs: nil,
}
}
// ConsumeOptions are used to describe how a new consumer will be created.
type ConsumeOptions struct {
QueueDeclare QueueDeclareOptions
BindingExchange BindingExchangeOptions
Qos QosOptions
Concurrency int
ConsumerName string
ConsumerAutoAck bool
ConsumerExclusive bool
ConsumerNoWait bool
ConsumerNoLocal bool
ConsumerArgs Table
}
// WithConsumeOptionsConcurrency returns a function that sets the concurrency, which means that
// many goroutines will be spawned to run the provided handler on messages
func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.Concurrency = concurrency
}
}
// WithConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that
// many messages will be fetched from the server in advance to help with throughput.
// This doesn't affect the handler, messages are still processed one at a time.
func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.Qos.QOSPrefetchCount = prefetchCount
}
}
// WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means
// these QOS settings apply to ALL existing and future
// consumers on all channels on the same connection
func WithConsumeOptionsQOSGlobal(options *ConsumeOptions) {
options.Qos.QOSGlobal = true
}
// WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer
// if unset a random name will be given
func WithConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.ConsumerName = consumerName
}
}
// WithConsumeOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer
// if unset the default will be used (false)
func WithConsumeOptionsConsumerAutoAck(autoAck bool) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.ConsumerAutoAck = autoAck
}
}
// WithConsumeOptionsConsumerExclusive sets the consumer to exclusive, which means
// the server will ensure that this is the sole consumer
// from this queue. When exclusive is false, the server will fairly distribute
// deliveries across multiple consumers.
func WithConsumeOptionsConsumerExclusive(options *ConsumeOptions) {
options.ConsumerExclusive = true
}
// WithConsumeOptionsConsumerNoWait sets the consumer to nowait, which means
// it does not wait for the server to confirm the request and
// immediately begin deliveries. If it is not possible to consume, a channel
// exception will be raised and the channel will be closed.
func WithConsumeOptionsConsumerNoWait(options *ConsumeOptions) {
options.ConsumerNoWait = true
}