-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaws_sqs.go
More file actions
446 lines (374 loc) · 11.9 KB
/
aws_sqs.go
File metadata and controls
446 lines (374 loc) · 11.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
package source
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"sync"
"sync/atomic"
"time"
"github.com/GoCodeAlone/workflow/connector"
"github.com/google/uuid"
)
// SQSConfig holds the configuration for the AWS SQS source and sink.
type SQSConfig struct {
QueueURL string `json:"queue_url" yaml:"queue_url"`
Region string `json:"region" yaml:"region"`
MaxMessages int `json:"max_messages" yaml:"max_messages"`
WaitTimeSeconds int `json:"wait_time_seconds" yaml:"wait_time_seconds"`
}
// SQSMessage represents a single message received from SQS.
type SQSMessage struct {
MessageID string
ReceiptHandle string
Body string
Attributes map[string]string
}
// SQSClient abstracts the AWS SQS API for testability.
// In production, this would wrap the AWS SDK v2 SQS client; in tests, a mock is used.
type SQSClient interface {
// ReceiveMessages polls the SQS queue for up to maxMessages.
ReceiveMessages(ctx context.Context, queueURL string, maxMessages, waitTimeSeconds int) ([]SQSMessage, error)
// DeleteMessage removes a message from the queue after successful processing.
DeleteMessage(ctx context.Context, queueURL, receiptHandle string) error
// SendMessage sends a message to the SQS queue.
SendMessage(ctx context.Context, queueURL, body string, attributes map[string]string) (messageID string, err error)
// SendMessageBatch sends multiple messages to the SQS queue.
SendMessageBatch(ctx context.Context, queueURL string, entries []SQSBatchEntry) ([]SQSBatchResult, error)
}
// SQSBatchEntry represents a single entry in a SendMessageBatch call.
type SQSBatchEntry struct {
ID string
Body string
Attributes map[string]string
}
// SQSBatchResult represents the result of a single entry in a SendMessageBatch call.
type SQSBatchResult struct {
ID string
MessageID string
Error error
}
// ---------------------------------------------------------------------------
// SQS Source
// ---------------------------------------------------------------------------
// SQSSource is an EventSource that polls an AWS SQS queue and emits
// CloudEvents-compatible events for each message received.
type SQSSource struct {
name string
config SQSConfig
client SQSClient
logger *slog.Logger
cancel context.CancelFunc
done chan struct{}
healthy atomic.Bool
mu sync.Mutex
}
// NewSQSSource creates a new SQSSource from a config map.
// Supported config keys: queue_url, region, max_messages, wait_time_seconds.
func NewSQSSource(name string, config map[string]any) (*SQSSource, error) {
cfg, err := parseSQSConfig(config)
if err != nil {
return nil, fmt.Errorf("sqs source %q: %w", name, err)
}
return &SQSSource{
name: name,
config: cfg,
logger: slog.Default().With("connector", "sqs", "role", "source", "name", name),
}, nil
}
// NewSQSSourceWithClient creates an SQSSource with a custom SQS client.
// This is primarily used for testing with mock clients.
func NewSQSSourceWithClient(name string, config map[string]any, client SQSClient) (*SQSSource, error) {
src, err := NewSQSSource(name, config)
if err != nil {
return nil, err
}
src.client = client
return src, nil
}
// Name returns the connector instance name.
func (s *SQSSource) Name() string { return s.name }
// Type returns the connector type identifier.
func (s *SQSSource) Type() string { return "sqs" }
// Start begins polling the SQS queue and writing events to the output channel.
func (s *SQSSource) Start(ctx context.Context, output chan<- connector.Event) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.client == nil {
return fmt.Errorf("sqs source %q: no SQSClient configured (set via NewSQSSourceWithClient)", s.name)
}
loopCtx, cancel := context.WithCancel(ctx)
s.cancel = cancel
s.done = make(chan struct{})
s.healthy.Store(true)
go s.pollLoop(loopCtx, output)
s.logger.Info("started", "queue_url", s.config.QueueURL, "region", s.config.Region)
return nil
}
// Stop gracefully shuts down the SQS poller.
func (s *SQSSource) Stop(_ context.Context) error {
s.mu.Lock()
defer s.mu.Unlock()
s.healthy.Store(false)
if s.cancel != nil {
s.cancel()
s.cancel = nil
}
if s.done != nil {
<-s.done
}
return nil
}
// Healthy returns true when the source is polling.
func (s *SQSSource) Healthy() bool {
return s.healthy.Load()
}
// Checkpoint is a no-op for SQS (messages are deleted after processing).
func (s *SQSSource) Checkpoint(_ context.Context) error {
return nil
}
// pollLoop continuously long-polls SQS for messages.
func (s *SQSSource) pollLoop(ctx context.Context, output chan<- connector.Event) {
defer close(s.done)
maxMessages := s.config.MaxMessages
if maxMessages <= 0 {
maxMessages = 10
}
waitTime := s.config.WaitTimeSeconds
if waitTime <= 0 {
waitTime = 20
}
for {
select {
case <-ctx.Done():
return
default:
}
messages, err := s.client.ReceiveMessages(ctx, s.config.QueueURL, maxMessages, waitTime)
if err != nil {
if ctx.Err() != nil {
return
}
s.logger.Error("receive error", "error", err)
s.healthy.Store(false)
select {
case <-time.After(time.Second):
case <-ctx.Done():
return
}
s.healthy.Store(true)
continue
}
for _, msg := range messages {
event := s.messageToEvent(msg)
select {
case output <- event:
case <-ctx.Done():
return
}
// Delete the message after delivering to the output channel.
if err := s.client.DeleteMessage(ctx, s.config.QueueURL, msg.ReceiptHandle); err != nil {
s.logger.Warn("delete message failed", "error", err, "message_id", msg.MessageID)
}
}
}
}
// messageToEvent converts an SQS message to a CloudEvents Event.
func (s *SQSSource) messageToEvent(msg SQSMessage) connector.Event {
// Attempt to use the body directly as JSON; if it fails, wrap as string.
var data json.RawMessage
if json.Valid([]byte(msg.Body)) {
data = json.RawMessage(msg.Body)
} else {
data, _ = json.Marshal(msg.Body)
}
// Determine event type from message attributes or use default.
eventType := "sqs.message"
if t, ok := msg.Attributes["event_type"]; ok && t != "" {
eventType = t
}
return connector.Event{
ID: uuid.New().String(),
Source: "sqs/" + s.name,
Type: eventType,
Subject: msg.MessageID,
Time: time.Now().UTC(),
Data: data,
DataContentType: "application/json",
}
}
// ---------------------------------------------------------------------------
// SQS Sink
// ---------------------------------------------------------------------------
// SQSSink is an EventSink that delivers events to an AWS SQS queue.
type SQSSink struct {
name string
config SQSConfig
client SQSClient
logger *slog.Logger
healthy atomic.Bool
}
// NewSQSSink creates a new SQSSink from a config map.
// Supported config keys: queue_url, region.
func NewSQSSink(name string, config map[string]any) (*SQSSink, error) {
cfg, err := parseSQSConfig(config)
if err != nil {
return nil, fmt.Errorf("sqs sink %q: %w", name, err)
}
sink := &SQSSink{
name: name,
config: cfg,
logger: slog.Default().With("connector", "sqs", "role", "sink", "name", name),
}
sink.healthy.Store(true)
return sink, nil
}
// NewSQSSinkWithClient creates an SQSSink with a custom SQS client.
// This is primarily used for testing with mock clients.
func NewSQSSinkWithClient(name string, config map[string]any, client SQSClient) (*SQSSink, error) {
sink, err := NewSQSSink(name, config)
if err != nil {
return nil, err
}
sink.client = client
return sink, nil
}
// Name returns the connector instance name.
func (s *SQSSink) Name() string { return s.name }
// Type returns the connector type identifier.
func (s *SQSSink) Type() string { return "sqs" }
// Deliver sends a single event to the SQS queue.
func (s *SQSSink) Deliver(ctx context.Context, event connector.Event) error {
if s.client == nil {
return fmt.Errorf("sqs sink %q: no SQSClient configured", s.name)
}
body, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("sqs sink %q: marshal event: %w", s.name, err)
}
attrs := map[string]string{
"event_type": event.Type,
"event_source": event.Source,
}
if event.Subject != "" {
attrs["event_subject"] = event.Subject
}
_, err = s.client.SendMessage(ctx, s.config.QueueURL, string(body), attrs)
if err != nil {
return fmt.Errorf("sqs sink %q: send: %w", s.name, err)
}
return nil
}
// DeliverBatch sends multiple events to the SQS queue. Returns per-event errors.
func (s *SQSSink) DeliverBatch(ctx context.Context, events []connector.Event) []error {
errs := make([]error, len(events))
if s.client == nil {
for i := range errs {
errs[i] = fmt.Errorf("sqs sink %q: no SQSClient configured", s.name)
}
return errs
}
// Build batch entries.
entries := make([]SQSBatchEntry, len(events))
for i := range events {
body, err := json.Marshal(events[i])
if err != nil {
errs[i] = fmt.Errorf("marshal event %d: %w", i, err)
continue
}
attrs := map[string]string{
"event_type": events[i].Type,
"event_source": events[i].Source,
}
entries[i] = SQSBatchEntry{
ID: fmt.Sprintf("entry-%d", i),
Body: string(body),
Attributes: attrs,
}
}
results, err := s.client.SendMessageBatch(ctx, s.config.QueueURL, entries)
if err != nil {
// Wholesale failure: all events failed.
for i := range errs {
if errs[i] == nil { // don't overwrite marshal errors
errs[i] = fmt.Errorf("sqs sink %q: batch send: %w", s.name, err)
}
}
return errs
}
// Map results back to error slots by entry index.
resultMap := make(map[string]error)
for _, r := range results {
if r.Error != nil {
resultMap[r.ID] = r.Error
}
}
for i := range events {
entryID := fmt.Sprintf("entry-%d", i)
if e, ok := resultMap[entryID]; ok {
errs[i] = e
}
}
return errs
}
// Stop marks the sink as unhealthy.
func (s *SQSSink) Stop(_ context.Context) error {
s.healthy.Store(false)
return nil
}
// Healthy returns true when the sink is operational.
func (s *SQSSink) Healthy() bool {
return s.healthy.Load()
}
// ---------------------------------------------------------------------------
// Config parsing
// ---------------------------------------------------------------------------
// parseSQSConfig extracts SQSConfig from a generic map.
func parseSQSConfig(config map[string]any) (SQSConfig, error) {
cfg := SQSConfig{
MaxMessages: 10,
WaitTimeSeconds: 20,
}
if qURL, ok := config["queue_url"].(string); ok && qURL != "" {
cfg.QueueURL = qURL
} else {
return cfg, fmt.Errorf("queue_url is required")
}
if region, ok := config["region"].(string); ok {
cfg.Region = region
}
if mm, ok := config["max_messages"].(int); ok && mm > 0 {
cfg.MaxMessages = mm
}
if mm, ok := config["max_messages"].(float64); ok && mm > 0 {
cfg.MaxMessages = int(mm)
}
if wt, ok := config["wait_time_seconds"].(int); ok && wt >= 0 {
cfg.WaitTimeSeconds = wt
}
if wt, ok := config["wait_time_seconds"].(float64); ok && wt >= 0 {
cfg.WaitTimeSeconds = int(wt)
}
return cfg, nil
}
// ---------------------------------------------------------------------------
// Factories
// ---------------------------------------------------------------------------
// SQSSourceFactory creates SQSSource instances from config maps.
// Note: The returned source requires an SQSClient to be injected before Start().
func SQSSourceFactory(name string, config map[string]any) (connector.EventSource, error) {
return NewSQSSource(name, config)
}
// SQSSinkFactory creates SQSSink instances from config maps.
// Note: The returned sink requires an SQSClient to be injected before Deliver().
func SQSSinkFactory(name string, config map[string]any) (connector.EventSink, error) {
return NewSQSSink(name, config)
}
// NewSQSSourceFactory returns a SourceFactory for AWS SQS.
func NewSQSSourceFactory() connector.SourceFactory {
return SQSSourceFactory
}
// NewSQSSinkFactory returns a SinkFactory for AWS SQS.
func NewSQSSinkFactory() connector.SinkFactory {
return SQSSinkFactory
}