-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathquery_handler.go
More file actions
259 lines (235 loc) · 8.41 KB
/
query_handler.go
File metadata and controls
259 lines (235 loc) · 8.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
package module
import (
"context"
"encoding/json"
"net/http"
"strings"
"sync"
"github.com/CrisisTextLine/modular"
"github.com/GoCodeAlone/workflow/interfaces"
)
// QueryFunc is a read-only query function that returns data or an error.
type QueryFunc func(ctx context.Context, r *http.Request) (any, error)
// QueryHandler dispatches GET requests to named query functions.
// Each query is registered by name and dispatched by extracting the last
// path segment from the request URL. Route pipelines can be attached for
// composable per-route processing. A delegate service can be configured
// to handle requests that don't match any registered query name.
type QueryHandler struct {
name string
delegate string // service name to resolve as http.Handler
delegateHandler http.Handler
app modular.Application
queries map[string]QueryFunc
routePipelines map[string]interfaces.PipelineRunner
executionTracker ExecutionTrackerProvider
mu sync.RWMutex
}
// NewQueryHandler creates a new QueryHandler with the given name.
func NewQueryHandler(name string) *QueryHandler {
return &QueryHandler{
name: name,
queries: make(map[string]QueryFunc),
routePipelines: make(map[string]interfaces.PipelineRunner),
}
}
// SetRoutePipeline attaches a pipeline to a specific route path.
func (h *QueryHandler) SetRoutePipeline(routePath string, pipeline interfaces.PipelineRunner) {
h.mu.Lock()
defer h.mu.Unlock()
h.routePipelines[routePath] = pipeline
}
// Name returns the unique identifier for this module.
func (h *QueryHandler) Name() string {
return h.name
}
// SetDelegate sets the delegate service name. The service must implement
// http.Handler and will be resolved from the service registry during Init.
func (h *QueryHandler) SetDelegate(name string) {
h.delegate = name
}
// SetDelegateHandler directly sets the HTTP handler used for delegation.
func (h *QueryHandler) SetDelegateHandler(handler http.Handler) {
h.delegateHandler = handler
}
// SetExecutionTracker sets the execution tracker for recording pipeline executions.
func (h *QueryHandler) SetExecutionTracker(t ExecutionTrackerProvider) {
h.executionTracker = t
}
// Init initializes the query handler and resolves the delegate service.
func (h *QueryHandler) Init(app modular.Application) error {
h.app = app
if h.delegate != "" {
h.resolveDelegate()
}
return nil
}
// resolveDelegate looks up the delegate service and checks for http.Handler.
func (h *QueryHandler) resolveDelegate() {
if h.app == nil || h.delegate == "" {
return
}
svc, ok := h.app.SvcRegistry()[h.delegate]
if !ok {
return
}
if handler, ok := svc.(http.Handler); ok {
h.delegateHandler = handler
}
}
// ResolveDelegatePostStart is called after engine.Start to resolve delegates
// that may not have been available during Init (e.g., services registered by
// post-start hooks).
func (h *QueryHandler) ResolveDelegatePostStart() {
if h.delegate != "" && h.delegateHandler == nil {
h.resolveDelegate()
}
}
// RegisterQuery adds a named query function to the handler.
func (h *QueryHandler) RegisterQuery(name string, fn QueryFunc) {
h.mu.Lock()
defer h.mu.Unlock()
h.queries[name] = fn
}
// Handle dispatches an HTTP request to the appropriate query function.
func (h *QueryHandler) Handle(w http.ResponseWriter, r *http.Request) {
h.ServeHTTP(w, r)
}
// ServeHTTP implements the http.Handler interface. It looks up a route pipeline
// by the full "METHOD /path" pattern (set by Go 1.22+ ServeMux), falling back
// to the last path segment for backward compatibility with registered queries.
// Dispatch chain: RegisteredQueryFunc -> RoutePipeline -> DelegateHandler -> 404
func (h *QueryHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
queryName := lastPathSegment(r.URL.Path)
// Use Go 1.22+ pattern for pipeline lookup (avoids last-segment collisions)
routeKey := r.Pattern
h.mu.RLock()
fn, exists := h.queries[queryName]
pipeline := h.routePipelines[routeKey]
if pipeline == nil {
// Fallback: try last-segment lookup for backward compatibility
pipeline = h.routePipelines[queryName]
}
h.mu.RUnlock()
if exists {
result, err := fn(r.Context(), r)
if err != nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(result); err != nil {
http.Error(w, "failed to encode response", http.StatusInternalServerError)
}
return
}
if pipeline != nil {
triggerData := map[string]any{
"method": r.Method,
"path": r.URL.Path,
"queryName": queryName,
"query": r.URL.Query(),
}
// Type-assert to *Pipeline for concrete field access (Metadata, RoutePattern,
// Execute) and execution tracker integration. All engine-registered pipelines
// are *Pipeline; the interface allows custom implementations in tests/plugins.
// concretePipeline != nil: real *Pipeline.
// concretePipeline == nil && isConcrete: typed-nil – fall through to delegate/404.
// !isConcrete: different implementation – use PipelineRunner.Run() fallback.
concretePipeline, isConcrete := pipeline.(*Pipeline)
if isConcrete && concretePipeline != nil {
// Inject HTTP context so delegate steps can forward directly
concretePipeline.Metadata = map[string]any{
"_http_request": r,
"_http_response_writer": w,
}
if concretePipeline.RoutePattern != "" {
concretePipeline.Metadata["_route_pattern"] = concretePipeline.RoutePattern
}
var pc *PipelineContext
var err error
if h.executionTracker != nil {
pc, err = h.executionTracker.TrackPipelineExecution(r.Context(), concretePipeline, triggerData, r)
} else {
pc, err = concretePipeline.Execute(r.Context(), triggerData)
}
if err != nil {
// Only write error if response wasn't already handled by a delegate step
if pc == nil || pc.Metadata["_response_handled"] != true {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
}
return
}
// If response was handled by a delegate step, don't write again
if pc.Metadata["_response_handled"] == true {
return
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(pc.Current); err != nil {
http.Error(w, "failed to encode response", http.StatusInternalServerError)
}
return
} else if !isConcrete {
// Fallback for non-*Pipeline implementations: use the PipelineRunner interface.
result, err := pipeline.Run(r.Context(), triggerData)
if err != nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
return
}
// Allow the runner to signal that it has already written the response.
if result["_response_handled"] == true {
return
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(result); err != nil {
http.Error(w, "failed to encode response", http.StatusInternalServerError)
}
return
}
// typed-nil *Pipeline: fall through to delegate/404 handling.
}
if h.delegateHandler != nil {
h.delegateHandler.ServeHTTP(w, r)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusNotFound)
_ = json.NewEncoder(w).Encode(map[string]string{"error": "unknown query: " + queryName})
}
// ProvidesServices returns a list of services provided by this module.
func (h *QueryHandler) ProvidesServices() []modular.ServiceProvider {
return []modular.ServiceProvider{
{
Name: h.name,
Description: "Query Handler",
Instance: h,
},
}
}
// RequiresServices returns a list of services required by this module.
func (h *QueryHandler) RequiresServices() []modular.ServiceDependency {
if h.delegate != "" {
return []modular.ServiceDependency{
{
Name: h.delegate,
Required: false,
},
}
}
return nil
}
// lastPathSegment extracts the last non-empty segment from a URL path.
// For example, "/api/v1/admin/engine/config" returns "config".
func lastPathSegment(path string) string {
path = strings.TrimRight(path, "/")
if idx := strings.LastIndex(path, "/"); idx >= 0 {
return path[idx+1:]
}
return path
}