-
Notifications
You must be signed in to change notification settings - Fork 0
/
rabbitmq_client.go
137 lines (122 loc) · 3.83 KB
/
rabbitmq_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package EasyGoQ
import (
"encoding/json"
"github.com/pkg/errors"
amqp "github.com/rabbitmq/amqp091-go"
"sync"
"time"
)
type IRabbitMqClient interface {
Publish(queueName string, data interface{}) error
PublishError(queueName string, data []byte, table amqp.Table, errorCause error) error
GetChannel(isReuse bool) (*amqp.Channel, error)
PublishRetry(queueName string, delivery amqp.Delivery) error
}
type rabbitMqClient struct {
Connection *amqp.Connection
Channel *amqp.Channel
logger ILogger
}
// singleton
var _rabbitMqClient *rabbitMqClient
var mu sync.Mutex
func NewRabbitMqClient(connectionString string, logger ILogger, isReuse bool) (*rabbitMqClient, error) {
if isReuse == true && _rabbitMqClient != nil {
return _rabbitMqClient, nil
}
mu.Lock()
defer mu.Unlock()
rabbitMqConnection, err := amqp.Dial(connectionString)
if err != nil {
logger.Errorf("InitConnectionRabbitMq %+v", errors.Wrap(err, "InitConnectionRabbitMq Dial"))
return nil, err
}
go func() {
logger.Warnf("closing: %s", <-rabbitMqConnection.NotifyClose(make(chan *amqp.Error)))
}()
rabbitMqChannel, err := rabbitMqConnection.Channel()
if err != nil {
logger.Errorf("InitConnectionRabbitMq %+v", errors.Wrap(err, "InitConnectionRabbitMq Channel"))
return nil, err
}
if isReuse == true {
_rabbitMqClient = &rabbitMqClient{
rabbitMqConnection,
rabbitMqChannel,
logger,
}
return _rabbitMqClient, nil
}
return &rabbitMqClient{
rabbitMqConnection,
rabbitMqChannel,
logger,
}, nil
}
func (rabbitMqClient *rabbitMqClient) Publish(queueName string, data interface{}) error {
dataJson, _ := json.Marshal(data)
err := rabbitMqClient.Channel.Publish(queueName, "", false, false, amqp.Publishing{
ContentType: "application/json",
Body: dataJson,
Timestamp: time.Now(),
})
if err != nil {
rabbitMqClient.logger.Errorf("rabbitMqClient Publish %s %v", queueName, errors.Wrap(err, "Publish"))
}
return err
}
func (rabbitMqClient *rabbitMqClient) PublishRetry(queueName string, delivery amqp.Delivery) error {
err := rabbitMqClient.Channel.Publish(queueName, "", false, false, amqp.Publishing{
ContentType: "application/json",
Body: delivery.Body,
Timestamp: time.Now(),
Headers: delivery.Headers,
})
if err != nil {
rabbitMqClient.logger.Errorf("rabbitMqClient Publish %s %v", queueName, errors.Wrap(err, "Publish"))
}
return err
}
func (rabbitMqClient *rabbitMqClient) PublishError(queueName string, data []byte, table amqp.Table, errorCause error) error {
if table == nil {
table = amqp.Table{
"queueName": queueName,
"retry-no": 1,
"errorCause": errorCause.Error(),
"createdDate": time.Now(),
"updatedDate": time.Now(),
}
} else {
table["queueName"] = queueName
retryNo, _ := table["retry-no"].(int32)
table["retry-no"] = retryNo + 1
table["errorCause"] = errorCause.Error()
table["updatedDate"] = time.Now()
}
rabbitMqClient.logger.Warnf("rabbitMqClient PublishError %s %v %v", queueName, table, data)
err := rabbitMqClient.Channel.Publish(GetErrorQueueName(queueName), "", false, false, amqp.Publishing{
ContentType: "application/json",
Body: data,
Timestamp: time.Now(),
Headers: table,
})
if err != nil {
rabbitMqClient.logger.Errorf("rabbitMqClient Publish %s %v", queueName, errors.Wrap(err, "Publish"))
}
return err
}
func (rabbitMqClient *rabbitMqClient) GetChannel(isReuse bool) (*amqp.Channel, error) {
if isReuse {
return _rabbitMqClient.Channel, nil
}
rabbitMqChannel, err := rabbitMqClient.Connection.Channel()
if err != nil {
rabbitMqClient.logger.Errorf("rabbitMqClient InitConnectionRabbitMq %+v", errors.Wrap(err, "InitConnectionRabbitMq Channel"))
return nil, err
}
//defer rabbitMqChannel.Close()
return rabbitMqChannel, nil
}
func (rabbitMqClient *rabbitMqClient) Close() error {
return rabbitMqClient.Connection.Close()
}