-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprocessing_step.go
More file actions
228 lines (201 loc) · 7.09 KB
/
processing_step.go
File metadata and controls
228 lines (201 loc) · 7.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package module
import (
"context"
"fmt"
"maps"
"math"
"time"
"github.com/GoCodeAlone/modular"
)
// Executor is the interface that dynamic components satisfy.
type Executor interface {
Execute(ctx context.Context, params map[string]any) (map[string]any, error)
}
// ProcessingStepConfig holds configuration for a processing step module.
type ProcessingStepConfig struct {
ComponentID string // service name to look up in registry
SuccessTransition string // transition to fire on success
CompensateTransition string // transition to fire on permanent failure
MaxRetries int // default 2
RetryBackoffMs int // base backoff in ms, default 1000
TimeoutSeconds int // per-attempt timeout, default 30
}
// ProcessingStep bridges dynamic components to state machine transitions.
// It implements TransitionHandler, wrapping an Executor with retry and
// compensation logic.
type ProcessingStep struct {
name string
config ProcessingStepConfig
executor Executor
smEngine *StateMachineEngine
metrics *MetricsCollector
}
// NewProcessingStep creates a new ProcessingStep module.
func NewProcessingStep(name string, config ProcessingStepConfig) *ProcessingStep {
if config.MaxRetries < 0 {
config.MaxRetries = 2
}
if config.RetryBackoffMs <= 0 {
config.RetryBackoffMs = 1000
}
if config.TimeoutSeconds <= 0 {
config.TimeoutSeconds = 30
}
return &ProcessingStep{
name: name,
config: config,
}
}
// Name returns the module name.
func (ps *ProcessingStep) Name() string {
return ps.name
}
// Init resolves dependencies from the service registry.
// Note: service registration is handled by ProvidesServices() — the framework
// calls it after Init completes, so we don't register here.
func (ps *ProcessingStep) Init(app modular.Application) error {
// Resolve the executor (dynamic component) from the service registry
if ps.config.ComponentID != "" {
var executor Executor
if err := app.GetService(ps.config.ComponentID, &executor); err != nil {
return fmt.Errorf("processing step %q: resolve executor %q: %w", ps.name, ps.config.ComponentID, err)
}
ps.executor = executor
}
// Resolve state machine engine (optional, for firing transitions).
// Try by standard name first, then scan the registry for any engine.
var smEngine *StateMachineEngine
if err := app.GetService(StateMachineEngineName, &smEngine); err == nil && smEngine != nil {
ps.smEngine = smEngine
} else {
for _, svc := range app.SvcRegistry() {
if engine, ok := svc.(*StateMachineEngine); ok {
ps.smEngine = engine
break
}
}
}
// Resolve metrics collector (optional)
var metrics *MetricsCollector
if err := app.GetService("metrics.collector", &metrics); err == nil && metrics != nil {
ps.metrics = metrics
}
return nil
}
// Start is a no-op for the processing step.
func (ps *ProcessingStep) Start(_ context.Context) error {
return nil
}
// Stop is a no-op for the processing step.
func (ps *ProcessingStep) Stop(_ context.Context) error {
return nil
}
// ProvidesServices returns the service provided by this module.
func (ps *ProcessingStep) ProvidesServices() []modular.ServiceProvider {
return []modular.ServiceProvider{
{
Name: ps.name,
Description: "Processing step: " + ps.name,
Instance: ps,
},
}
}
// RequiresServices returns services required by this module.
func (ps *ProcessingStep) RequiresServices() []modular.ServiceDependency {
deps := []modular.ServiceDependency{
{
Name: StateMachineEngineName,
Required: false,
},
{
Name: "metrics.collector",
Required: false,
},
}
if ps.config.ComponentID != "" {
deps = append(deps, modular.ServiceDependency{
Name: ps.config.ComponentID,
Required: true,
})
}
return deps
}
// HandleTransition implements the TransitionHandler interface. It executes
// the wrapped dynamic component with retry and exponential backoff.
func (ps *ProcessingStep) HandleTransition(ctx context.Context, event TransitionEvent) error {
if ps.executor == nil {
return fmt.Errorf("processing step %q: no executor configured", ps.name)
}
// Build params from event data
params := make(map[string]any)
maps.Copy(params, event.Data)
params["workflowId"] = event.WorkflowID
params["transitionId"] = event.TransitionID
params["fromState"] = event.FromState
params["toState"] = event.ToState
startTime := time.Now()
var lastErr error
for attempt := 0; attempt <= ps.config.MaxRetries; attempt++ {
// Wait with exponential backoff (skip on first attempt)
if attempt > 0 {
backoff := ps.calculateBackoff(attempt)
select {
case <-time.After(backoff):
case <-ctx.Done():
return ctx.Err()
}
}
// Create per-attempt timeout context
attemptCtx, cancel := context.WithTimeout(ctx, time.Duration(ps.config.TimeoutSeconds)*time.Second)
result, err := ps.executor.Execute(attemptCtx, params)
cancel()
if err == nil {
// Executor succeeded (no Go error). Record metrics and fire success transition.
ps.recordMetrics("success", time.Since(startTime))
ps.fireTransition(ctx, event.WorkflowID, ps.config.SuccessTransition, result)
return nil
}
lastErr = err
}
// All retries exhausted — permanent failure
ps.recordMetrics("failure", time.Since(startTime))
ps.fireTransition(ctx, event.WorkflowID, ps.config.CompensateTransition, map[string]any{
"error": lastErr.Error(),
})
return fmt.Errorf("processing step %q: retries exhausted: %w", ps.name, lastErr)
}
// calculateBackoff returns the backoff duration for the given attempt (1-based).
func (ps *ProcessingStep) calculateBackoff(attempt int) time.Duration {
base := float64(ps.config.RetryBackoffMs) * math.Pow(2, float64(attempt-1))
return time.Duration(base) * time.Millisecond
}
// fireTransition triggers a state machine transition to avoid deadlocking
// when called from inside a transition handler. Uses the engine's tracked
// goroutine so shutdown can drain in-flight work.
//
// Note: Handlers are called BEFORE TriggerTransition commits the state change.
// The goroutine must wait briefly so the parent transition commits first;
// otherwise it may see stale state and silently fail.
func (ps *ProcessingStep) fireTransition(_ context.Context, workflowID, transition string, data map[string]any) {
if transition == "" || ps.smEngine == nil {
return
}
ps.smEngine.TrackGoroutine(func() {
// Brief pause to let the parent TriggerTransition commit the state
// change. Without this, the goroutine can race and find the instance
// still in its pre-transition state.
time.Sleep(10 * time.Millisecond)
// Use context.Background() because the spawned goroutine outlives
// the caller (e.g., an HTTP request handler whose context is
// cancelled after the response is written).
_ = ps.smEngine.TriggerTransition(context.Background(), workflowID, transition, data)
})
}
// recordMetrics records processing step metrics if a collector is available.
func (ps *ProcessingStep) recordMetrics(status string, duration time.Duration) {
if ps.metrics == nil {
return
}
ps.metrics.RecordModuleOperation(ps.name, "execute", status)
ps.metrics.RecordWorkflowDuration(ps.name, "processing_step", duration)
}