-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpipeline_step_json_response.go
More file actions
235 lines (213 loc) · 6.02 KB
/
pipeline_step_json_response.go
File metadata and controls
235 lines (213 loc) · 6.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
package module
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"github.com/GoCodeAlone/modular"
)
// JSONResponseStep writes an HTTP JSON response with a custom status code and stops the pipeline.
type JSONResponseStep struct {
name string
status int
headers map[string]string
body map[string]any
bodyRaw any // for non-map bodies (arrays, literals)
bodyFrom string
tmpl *TemplateEngine
}
// NewJSONResponseStepFactory returns a StepFactory that creates JSONResponseStep instances.
func NewJSONResponseStepFactory() StepFactory {
return func(name string, config map[string]any, _ modular.Application) (PipelineStep, error) {
status := 200
if s, ok := config["status"]; ok {
switch v := s.(type) {
case int:
status = v
case float64:
status = int(v)
}
}
var headers map[string]string
if h, ok := config["headers"].(map[string]any); ok {
headers = make(map[string]string, len(h))
for k, v := range h {
if s, ok := v.(string); ok {
headers[k] = s
}
}
}
var body map[string]any
var bodyRaw any
if b, ok := config["body"].(map[string]any); ok {
body = b
} else if config["body"] != nil {
// Support non-map bodies like arrays or literals
bodyRaw = config["body"]
}
bodyFrom, _ := config["body_from"].(string)
return &JSONResponseStep{
name: name,
status: status,
headers: headers,
body: body,
bodyRaw: bodyRaw,
bodyFrom: bodyFrom,
tmpl: NewTemplateEngine(),
}, nil
}
}
func (s *JSONResponseStep) Name() string { return s.name }
func (s *JSONResponseStep) Execute(_ context.Context, pc *PipelineContext) (*StepResult, error) {
w, ok := pc.Metadata["_http_response_writer"].(http.ResponseWriter)
if !ok {
// No response writer — return the body as output without writing HTTP
responseBody := s.resolveResponseBody(pc)
output := map[string]any{
"status": s.status,
}
if responseBody != nil {
output["body"] = responseBody
}
return &StepResult{Output: output, Stop: true}, nil
}
// Determine response body
responseBody := s.resolveResponseBody(pc)
// Set headers
w.Header().Set("Content-Type", "application/json")
for k, v := range s.headers {
w.Header().Set(k, v)
}
// Write status code
w.WriteHeader(s.status)
// Write body
if responseBody != nil {
if err := json.NewEncoder(w).Encode(responseBody); err != nil {
return nil, fmt.Errorf("json_response step %q: failed to encode response: %w", s.name, err)
}
}
// Mark response as handled
pc.Metadata["_response_handled"] = true
return &StepResult{
Output: map[string]any{
"status": s.status,
},
Stop: true,
}, nil
}
// resolveResponseBody determines the response body from the step configuration.
func (s *JSONResponseStep) resolveResponseBody(pc *PipelineContext) any {
if s.bodyFrom != "" {
return resolveBodyFrom(s.bodyFrom, pc)
}
if s.body != nil {
result := make(map[string]any, len(s.body))
for k, v := range s.body {
resolved, err := s.resolveBodyValue(v, pc)
if err != nil {
return s.body // fallback to unresolved
}
result[k] = resolved
}
return result
}
if s.bodyRaw != nil {
return s.bodyRaw
}
return nil
}
// resolveBodyValue resolves a single body value, supporting:
// - `_from` references that inject raw step output values
// - nested maps and slices
// - template strings resolved via the TemplateEngine.
//
// `_from` is treated as a special directive only when it is the sole key in a map,
// e.g. `{"_from": "steps.fetch.rows"}`. This keeps the semantics simple and avoids
// ambiguity: the entire value is replaced with the referenced data.
//
// As a consequence, `_from` cannot be combined with other fields or template
// expressions in the same map node. Configuration authors can still mix raw
// injections and templated fields by using `_from` on a sibling field in the
// parent object instead.
func (s *JSONResponseStep) resolveBodyValue(v any, pc *PipelineContext) (any, error) {
switch val := v.(type) {
case map[string]any:
// Check for _from reference, used only when it is the single key:
// {"_from": "steps.fetch.rows"}. Combining `_from` with other keys in
// the same map is intentionally not supported.
if from, ok := val["_from"].(string); ok && len(val) == 1 {
return resolveBodyFrom(from, pc), nil
}
// Recurse into nested map
result := make(map[string]any, len(val))
for k, item := range val {
resolved, err := s.resolveBodyValue(item, pc)
if err != nil {
return nil, fmt.Errorf("field %q: %w", k, err)
}
result[k] = resolved
}
return result, nil
case []any:
result := make([]any, len(val))
for i, item := range val {
resolved, err := s.resolveBodyValue(item, pc)
if err != nil {
return nil, err
}
result[i] = resolved
}
return result, nil
case string:
return s.tmpl.Resolve(val, pc)
default:
return v, nil
}
}
// resolveBodyFrom resolves a dotted path like "steps.get-company.row" from the
// pipeline context. It looks in StepOutputs first (for "steps.X.Y" paths),
// then in Current.
func resolveBodyFrom(path string, pc *PipelineContext) any {
parts := strings.SplitN(path, ".", 2)
if len(parts) < 2 {
// Single key — look in Current
if val, ok := pc.Current[path]; ok {
return val
}
return nil
}
prefix := parts[0]
rest := parts[1]
if prefix == "steps" {
// "steps.stepName.field..." — look in StepOutputs
stepParts := strings.SplitN(rest, ".", 2)
stepName := stepParts[0]
stepOutput, ok := pc.StepOutputs[stepName]
if !ok {
return nil
}
if len(stepParts) == 1 {
return stepOutput
}
return resolveNestedPath(stepOutput, stepParts[1])
}
// Generic dotted path in Current
return resolveNestedPath(pc.Current, path)
}
// resolveNestedPath walks a map[string]any using a dotted path.
func resolveNestedPath(data map[string]any, path string) any {
parts := strings.Split(path, ".")
var current any = data
for _, part := range parts {
m, ok := current.(map[string]any)
if !ok {
return nil
}
current, ok = m[part]
if !ok {
return nil
}
}
return current
}