Skip to content

Commit

Permalink
fixed example. added qos to options.
Browse files Browse the repository at this point in the history
  • Loading branch information
rsnullptr committed Jul 2, 2021
1 parent eca0b81 commit 782c36a
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 44 deletions.
19 changes: 19 additions & 0 deletions bindings_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rabbitmq
// BindingExchangeOptions are used when binding to an exchange.
// it will verify the exchange is created before binding to it.
type BindingExchangeOptions struct {
DoBinding bool
Name string
Kind string
Durable bool
Expand All @@ -16,6 +17,7 @@ type BindingExchangeOptions struct {

// QueueDeclareOptions arguments to declare a queue
type QueueDeclareOptions struct {
DoDeclare bool
QueueName string
QueueDurable bool
QueueAutoDelete bool
Expand All @@ -24,6 +26,18 @@ type QueueDeclareOptions struct {
QueueArgs Table
}

// QosOptions configuration
type QosOptions struct {
QOSPrefetchCount int
QOSPrefetchSize int
QOSGlobal bool
}

// WithQueueDeclare define if queue will be declare or not
func WithQueueDeclare(options *QueueDeclareOptions) {
options.DoDeclare = true
}

// WithQueueDeclareOptionsDurable sets the queue to durable, which means it won't
// be destroyed when the server restarts. It must only be bound to durable exchanges
func WithQueueDeclareOptionsDurable(options *QueueDeclareOptions) {
Expand Down Expand Up @@ -118,3 +132,8 @@ func WithBindingExchangeOptionsExchangeArgs(args Table, options *BindingExchange
func WithBindingExchangeOptionsNoWait(options *BindingExchangeOptions) {
options.BindingNoWait = true
}

// WithBindingExchange define if the exchange is to be binded or not
func WithBindingExchange(options *BindingExchangeOptions) {
options.DoBinding = true
}
34 changes: 18 additions & 16 deletions consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,24 +171,26 @@ func (consumer Consumer) startGoroutines(
consumer.chManager.channelMux.RLock()
defer consumer.chManager.channelMux.RUnlock()

_, err := consumer.chManager.channel.QueueDeclare(
queue,
consumeOptions.QueueDeclare.QueueDurable,
consumeOptions.QueueDeclare.QueueAutoDelete,
consumeOptions.QueueDeclare.QueueExclusive,
consumeOptions.QueueDeclare.QueueNoWait,
tableToAMQPTable(consumeOptions.QueueDeclare.QueueArgs),
)
if err != nil {
return err
if consumeOptions.QueueDeclare.DoDeclare {
_, err := consumer.chManager.channel.QueueDeclare(
queue,
consumeOptions.QueueDeclare.QueueDurable,
consumeOptions.QueueDeclare.QueueAutoDelete,
consumeOptions.QueueDeclare.QueueExclusive,
consumeOptions.QueueDeclare.QueueNoWait,
tableToAMQPTable(consumeOptions.QueueDeclare.QueueArgs),
)
if err != nil {
return err
}
}

if consumeOptions.BindingExchange.Name != "" {
if consumeOptions.BindingExchange.DoBinding {
exchange := consumeOptions.BindingExchange
if exchange.Name == "" {
return fmt.Errorf("binding to exchange but name not specified")
}
err = consumer.chManager.channel.ExchangeDeclare(
err := consumer.chManager.channel.ExchangeDeclare(
exchange.Name,
exchange.Kind,
exchange.Durable,
Expand All @@ -214,10 +216,10 @@ func (consumer Consumer) startGoroutines(
}
}

err = consumer.chManager.channel.Qos(
consumeOptions.QOSPrefetch,
0,
consumeOptions.QOSGlobal,
err := consumer.chManager.channel.Qos(
consumeOptions.Qos.QOSPrefetchCount,
consumeOptions.Qos.QOSPrefetchSize,
consumeOptions.Qos.QOSGlobal,
)
if err != nil {
return err
Expand Down
16 changes: 9 additions & 7 deletions consume_options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package rabbitmq

// getDefaultConsumeOptions descibes the options that will be used when a value isn't provided
// getDefaultConsumeOptions describes the options that will be used when a value isn't provided
func getDefaultConsumeOptions() ConsumeOptions {
return ConsumeOptions{
QueueDeclare: QueueDeclareOptions{
Expand All @@ -22,9 +22,12 @@ func getDefaultConsumeOptions() ConsumeOptions {
BindingArgs: nil,
ExchangeArgs: nil,
},
Qos: QosOptions{
QOSPrefetchCount: 0,
QOSPrefetchSize: 0,
QOSGlobal: false,
},
Concurrency: 1,
QOSPrefetch: 0,
QOSGlobal: false,
ConsumerName: "",
ConsumerAutoAck: false,
ConsumerExclusive: false,
Expand All @@ -38,9 +41,8 @@ func getDefaultConsumeOptions() ConsumeOptions {
type ConsumeOptions struct {
QueueDeclare QueueDeclareOptions
BindingExchange BindingExchangeOptions
Qos QosOptions
Concurrency int
QOSPrefetch int
QOSGlobal bool
ConsumerName string
ConsumerAutoAck bool
ConsumerExclusive bool
Expand All @@ -62,15 +64,15 @@ func WithConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions) {
// This doesn't affect the handler, messages are still processed one at a time.
func WithConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions) {
return func(options *ConsumeOptions) {
options.QOSPrefetch = prefetchCount
options.Qos.QOSPrefetchCount = prefetchCount
}
}

// WithConsumeOptionsQOSGlobal sets the qos on the channel to global, which means
// these QOS settings apply to ALL existing and future
// consumers on all channels on the same connection
func WithConsumeOptionsQOSGlobal(options *ConsumeOptions) {
options.QOSGlobal = true
options.Qos.QOSGlobal = true
}

// WithConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer
Expand Down
9 changes: 5 additions & 4 deletions examples/consumer/main.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package main

import (
"log"

"github.com/streadway/amqp"
"github.com/wagslane/go-rabbitmq"
"log"
)

func main() {
Expand All @@ -21,15 +20,17 @@ func main() {
// true to ACK, false to NACK
return true
},
"my_queue1",
[]string{"routing_key1", "routing_key_2"},
"my_queue5",
[]string{"routing_key_7"},
rabbitmq.WithConsumeOptionsConcurrency(10),
func(options *rabbitmq.ConsumeOptions) {
rabbitmq.WithQueueDeclare(&options.QueueDeclare)
rabbitmq.WithQueueDeclareOptionsDurable(&options.QueueDeclare)
rabbitmq.WithQueueDeclareOptionsQuorum(&options.QueueDeclare)
rabbitmq.WithBindingExchangeOptionsExchangeName("events", &options.BindingExchange)
rabbitmq.WithBindingExchangeOptionsExchangeKind("topic", &options.BindingExchange)
rabbitmq.WithBindingExchangeOptionsExchangeDurable(&options.BindingExchange)
options.Qos.QOSPrefetchCount = 1
},
)
if err != nil {
Expand Down
52 changes: 38 additions & 14 deletions examples/publisher/main.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,27 @@
package main

import (
"fmt"
"log"
"os"
"time"

"github.com/streadway/amqp"
"github.com/wagslane/go-rabbitmq"
)

func main() {
do()
}

func do() {
publisher, returns, err := rabbitmq.NewPublisher(
"amqp://guest:guest@localhost", amqp.Config{},
rabbitmq.WithPublisherOptionsLogging,
func(options *rabbitmq.PublisherOptions) {
options.BindingExchange = rabbitmq.BindingExchangeOptions{
Name: "beautifulExchange1",
DoBinding: true,
Name: "beautifulExchange5",
Kind: "topic",
Durable: true,
AutoDelete: false,
Expand All @@ -24,11 +32,12 @@ func main() {
}

options.RoutingKeys = []string{
"routing_key1",
"routing_key5",
}

options.QueueDeclare = rabbitmq.QueueDeclareOptions{
QueueName: "my_queue1",
DoDeclare: true,
QueueName: "my_queue5",
QueueDurable: true,
QueueAutoDelete: false,
QueueExclusive: false,
Expand All @@ -37,26 +46,41 @@ func main() {
}

options.QueueDeclare.QueueArgs["x-queue-type"] = "quorum"

options.Qos = rabbitmq.QosOptions{
QOSPrefetchCount: 1,
QOSPrefetchSize: 0,
QOSGlobal: true,
}
},
)
if err != nil {
log.Fatal(err)
}
err = publisher.Publish(
[]byte("hello, world1"),
[]string{"routing_key1"},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
rabbitmq.WithPublishOptionsExchange("events"),
)
if err != nil {
log.Fatal(err)
return
}

go func() {
for r := range returns {
log.Printf("message returned from server: %s", string(r.Body))
}
}()

idx := int64(0)
for {
err = publisher.Publish(
[]byte(fmt.Sprintf("AAA %d", idx)),
[]string{"routing_key5"},
rabbitmq.WithPublishOptionsContentType("application/json"),
rabbitmq.WithPublishOptionsMandatory,
rabbitmq.WithPublishOptionsPersistentDelivery,
rabbitmq.WithPublishOptionsExchange("beautifulExchange5"),
)
if err != nil {
log.Fatal(err)
}

fmt.Fprintf(os.Stderr, "%d \n", idx)
idx++
time.Sleep(time.Millisecond * 1000)
}
}
14 changes: 11 additions & 3 deletions publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher
defer publisher.chManager.channelMux.RUnlock()

// valid queue name, declare it
if options.QueueDeclare.QueueName != "" {
if options.QueueDeclare.DoDeclare {

_, err = publisher.chManager.channel.QueueDeclare(
options.QueueDeclare.QueueName,
options.QueueDeclare.QueueDurable,
Expand All @@ -101,8 +102,9 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher
}

//valid name, bind it
if options.BindingExchange.Name != "" {
if options.BindingExchange.DoBinding {
exchange := options.BindingExchange

err = publisher.chManager.channel.ExchangeDeclare(
exchange.Name,
exchange.Kind,
Expand Down Expand Up @@ -131,7 +133,13 @@ func NewPublisher(url string, config amqp.Config, optionFuncs ...func(*Publisher
}
}

return publisher, returnChan, nil
err = publisher.chManager.channel.Qos(
options.Qos.QOSPrefetchCount,
options.Qos.QOSPrefetchSize,
options.Qos.QOSGlobal,
)

return publisher, returnChan, err
}

// Publish publishes the provided data to the given routing keys over the connection
Expand Down
1 change: 1 addition & 0 deletions publish_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type PublisherOptions struct {
RoutingKeys []string
BindingExchange BindingExchangeOptions
QueueDeclare QueueDeclareOptions
Qos QosOptions
}

// WithPublisherOptionsLogging sets logging to true on the consumer options
Expand Down

0 comments on commit 782c36a

Please sign in to comment.