-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpipeline_step_sub_workflow.go
More file actions
199 lines (173 loc) · 6 KB
/
pipeline_step_sub_workflow.go
File metadata and controls
199 lines (173 loc) · 6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package module
import (
"context"
"fmt"
"time"
"github.com/CrisisTextLine/modular"
"github.com/GoCodeAlone/workflow/config"
"github.com/GoCodeAlone/workflow/plugin"
)
// SubWorkflowStep invokes a registered plugin workflow as a sub-workflow.
type SubWorkflowStep struct {
name string
workflow string // qualified name: "plugin:workflow"
inputMapping map[string]string
outputMapping map[string]string
timeout time.Duration
registry *plugin.PluginWorkflowRegistry
stepBuilder SubWorkflowStepBuilder
tmpl *TemplateEngine
}
// SubWorkflowStepBuilder builds pipeline steps from a workflow config's pipeline
// definitions. This is injected by the engine so the sub_workflow step can
// construct child pipelines without a circular dependency on engine.
type SubWorkflowStepBuilder func(pipelineName string, cfg *config.WorkflowConfig, app modular.Application) (*Pipeline, error)
// NewSubWorkflowStepFactory returns a StepFactory that creates SubWorkflowStep
// instances. The registry and stepBuilder are captured by closure so that
// the factory has access to them at step creation time.
func NewSubWorkflowStepFactory(registry *plugin.PluginWorkflowRegistry, stepBuilder SubWorkflowStepBuilder) StepFactory {
return func(name string, cfg map[string]any, _ modular.Application) (PipelineStep, error) {
workflowName, _ := cfg["workflow"].(string)
if workflowName == "" {
return nil, fmt.Errorf("sub_workflow step %q: 'workflow' is required", name)
}
step := &SubWorkflowStep{
name: name,
workflow: workflowName,
timeout: 30 * time.Second,
registry: registry,
stepBuilder: stepBuilder,
tmpl: NewTemplateEngine(),
}
if im, ok := cfg["input_mapping"].(map[string]any); ok {
step.inputMapping = make(map[string]string, len(im))
for k, v := range im {
if s, ok := v.(string); ok {
step.inputMapping[k] = s
}
}
}
if om, ok := cfg["output_mapping"].(map[string]any); ok {
step.outputMapping = make(map[string]string, len(om))
for k, v := range om {
if s, ok := v.(string); ok {
step.outputMapping[k] = s
}
}
}
if timeout, ok := cfg["timeout"].(string); ok && timeout != "" {
if d, err := time.ParseDuration(timeout); err == nil {
step.timeout = d
}
}
return step, nil
}
}
// Name returns the step name.
func (s *SubWorkflowStep) Name() string { return s.name }
// Execute runs the sub-workflow: looks up the embedded workflow, builds
// a child pipeline, maps inputs, executes, and maps outputs back.
func (s *SubWorkflowStep) Execute(ctx context.Context, pc *PipelineContext) (*StepResult, error) {
// Apply timeout
ctx, cancel := context.WithTimeout(ctx, s.timeout)
defer cancel()
// Look up the workflow in the registry
ewf, ok := s.registry.Get(s.workflow)
if !ok {
return nil, fmt.Errorf("sub_workflow step %q: workflow %q not found in registry", s.name, s.workflow)
}
// Resolve workflow config — prefer parsed Config, fall back to YAML string
wfCfg := ewf.Config
if wfCfg == nil && ewf.ConfigYAML != "" {
parsed, err := config.LoadFromString(ewf.ConfigYAML)
if err != nil {
return nil, fmt.Errorf("sub_workflow step %q: failed to parse workflow %q config YAML: %w", s.name, s.workflow, err)
}
wfCfg = parsed
}
if wfCfg == nil {
return nil, fmt.Errorf("sub_workflow step %q: workflow %q has no config", s.name, s.workflow)
}
// Build the child pipeline from the workflow config.
// Use the first pipeline defined, or the one matching the workflow name.
childPipeline, err := s.stepBuilder(ewf.Name, wfCfg, nil)
if err != nil {
return nil, fmt.Errorf("sub_workflow step %q: failed to build child pipeline for %q: %w", s.name, s.workflow, err)
}
// Map inputs from parent context to child trigger data
triggerData := make(map[string]any)
if s.inputMapping != nil {
for childKey, tmplExpr := range s.inputMapping {
resolved, resolveErr := s.tmpl.Resolve(tmplExpr, pc)
if resolveErr != nil {
return nil, fmt.Errorf("sub_workflow step %q: failed to resolve input %q: %w", s.name, childKey, resolveErr)
}
triggerData[childKey] = resolved
}
} else {
// No explicit mapping — pass parent's current data
for k, v := range pc.Current {
triggerData[k] = v
}
}
// Execute the child pipeline
childCtx, err := childPipeline.Execute(ctx, triggerData)
if err != nil {
return nil, fmt.Errorf("sub_workflow step %q: child workflow %q failed: %w", s.name, s.workflow, err)
}
// Map outputs back to parent context
output := make(map[string]any)
if s.outputMapping != nil {
for parentKey, childPath := range s.outputMapping {
output[parentKey] = resolveOutputPath(childCtx, childPath)
}
} else {
// No explicit mapping — return all child outputs under a "result" key
output["result"] = childCtx.Current
}
return &StepResult{Output: output}, nil
}
// resolveOutputPath extracts a value from the child pipeline context using
// a dot-separated path. Supports "result.field" (from Current) and
// "steps.stepName.field" (from StepOutputs).
func resolveOutputPath(childCtx *PipelineContext, path string) any {
// Try direct key in Current first
if v, ok := childCtx.Current[path]; ok {
return v
}
// Walk the dot-separated path through Current
return walkPath(childCtx.Current, path)
}
// walkPath traverses a nested map using a dot-separated path.
func walkPath(data map[string]any, path string) any {
parts := splitDotPath(path)
var current any = data
for _, part := range parts {
switch m := current.(type) {
case map[string]any:
current = m[part]
default:
return nil
}
}
return current
}
// splitDotPath splits a path by dots, e.g. "result.id" -> ["result", "id"].
func splitDotPath(path string) []string {
var parts []string
start := 0
for i := 0; i < len(path); i++ {
if path[i] == '.' {
if i > start {
parts = append(parts, path[start:i])
}
start = i + 1
}
}
if start < len(path) {
parts = append(parts, path[start:])
}
return parts
}
// Ensure interface satisfaction
var _ PipelineStep = (*SubWorkflowStep)(nil)