-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathschedule_trigger.go
More file actions
220 lines (183 loc) · 5.64 KB
/
schedule_trigger.go
File metadata and controls
220 lines (183 loc) · 5.64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
package module
import (
"context"
"fmt"
"maps"
"time"
"github.com/CrisisTextLine/modular"
)
const (
// ScheduleTriggerName is the standard name for schedule triggers
ScheduleTriggerName = "trigger.schedule"
)
// ScheduleTriggerConfig represents the configuration for a schedule trigger
type ScheduleTriggerConfig struct {
Jobs []ScheduleTriggerJob `json:"jobs" yaml:"jobs"`
}
// ScheduleTriggerJob represents a single scheduled job configuration
type ScheduleTriggerJob struct {
Cron string `json:"cron" yaml:"cron"`
Workflow string `json:"workflow" yaml:"workflow"`
Action string `json:"action" yaml:"action"`
Params map[string]any `json:"params,omitempty" yaml:"params,omitempty"`
}
// ScheduleTrigger implements a trigger that starts workflows based on a schedule
type ScheduleTrigger struct {
name string
namespace ModuleNamespaceProvider
jobs []ScheduleTriggerJob
engine WorkflowEngine
scheduler Scheduler
}
// NewScheduleTrigger creates a new schedule trigger
func NewScheduleTrigger() *ScheduleTrigger {
return NewScheduleTriggerWithNamespace(nil)
}
// NewScheduleTriggerWithNamespace creates a new schedule trigger with namespace support
func NewScheduleTriggerWithNamespace(namespace ModuleNamespaceProvider) *ScheduleTrigger {
// Default to standard namespace if none provided
if namespace == nil {
namespace = NewStandardNamespace("", "")
}
return &ScheduleTrigger{
name: namespace.FormatName(ScheduleTriggerName),
namespace: namespace,
jobs: make([]ScheduleTriggerJob, 0),
}
}
// Name returns the name of this trigger
func (t *ScheduleTrigger) Name() string {
return t.name
}
// Init initializes the trigger
func (t *ScheduleTrigger) Init(app modular.Application) error {
return app.RegisterService(t.name, t)
}
// Start starts the trigger
func (t *ScheduleTrigger) Start(ctx context.Context) error {
// If no jobs are configured, nothing to do
if len(t.jobs) == 0 {
return nil
}
// If no scheduler is set, we can't start
if t.scheduler == nil {
return fmt.Errorf("scheduler not configured for schedule trigger")
}
// If no engine is set, we can't start
if t.engine == nil {
return fmt.Errorf("workflow engine not configured for schedule trigger")
}
// Register all jobs with the scheduler
for _, job := range t.jobs {
// Create a job that will trigger the workflow
scheduledJob := t.createJob(job)
// Schedule the job
if err := t.scheduler.Schedule(scheduledJob); err != nil {
return fmt.Errorf("failed to schedule job for workflow '%s': %w", job.Workflow, err)
}
}
return nil
}
// Stop stops the trigger
func (t *ScheduleTrigger) Stop(ctx context.Context) error {
// Nothing to do here as the scheduler will be stopped elsewhere
return nil
}
// Configure sets up the trigger from configuration
func (t *ScheduleTrigger) Configure(app modular.Application, triggerConfig any) error {
// Convert the generic config to schedule trigger config
config, ok := triggerConfig.(map[string]any)
if !ok {
return fmt.Errorf("invalid schedule trigger configuration format")
}
// Extract jobs from configuration
jobsConfig, ok := config["jobs"].([]any)
if !ok {
return fmt.Errorf("jobs not found in schedule trigger configuration")
}
// Find the scheduler — try well-known names first, then scan all services
var scheduler Scheduler
schedulerNames := []string{"cronScheduler", "scheduler"}
for _, name := range schedulerNames {
var svc any
if err := app.GetService(name, &svc); err == nil && svc != nil {
if s, ok := svc.(Scheduler); ok {
scheduler = s
break
}
}
}
if scheduler == nil {
for _, svc := range app.SvcRegistry() {
if s, ok := svc.(Scheduler); ok {
scheduler = s
break
}
}
}
if scheduler == nil {
return fmt.Errorf("scheduler not found")
}
// Find the workflow engine — try well-known names first, then scan
var engine WorkflowEngine
engineNames := []string{"workflowEngine", "engine"}
for _, name := range engineNames {
var svc any
if err := app.GetService(name, &svc); err == nil && svc != nil {
if e, ok := svc.(WorkflowEngine); ok {
engine = e
break
}
}
}
if engine == nil {
for _, svc := range app.SvcRegistry() {
if e, ok := svc.(WorkflowEngine); ok {
engine = e
break
}
}
}
if engine == nil {
return fmt.Errorf("workflow engine not found")
}
// Store scheduler and engine references
t.scheduler = scheduler
t.engine = engine
// Parse jobs
for i, jc := range jobsConfig {
jobMap, ok := jc.(map[string]any)
if !ok {
return fmt.Errorf("invalid job configuration at index %d", i)
}
cron, _ := jobMap["cron"].(string)
workflow, _ := jobMap["workflow"].(string)
action, _ := jobMap["action"].(string)
if cron == "" || workflow == "" || action == "" {
return fmt.Errorf("incomplete job configuration at index %d: cron, workflow and action are required", i)
}
// Get optional params
params, _ := jobMap["params"].(map[string]any)
// Add the job
t.jobs = append(t.jobs, ScheduleTriggerJob{
Cron: cron,
Workflow: workflow,
Action: action,
Params: params,
})
}
return nil
}
// createJob creates a job for a specific scheduled trigger
func (t *ScheduleTrigger) createJob(job ScheduleTriggerJob) Job {
return NewFunctionJob(func(ctx context.Context) error {
// Create the data to pass to the workflow
data := make(map[string]any)
// Add current timestamp
data["trigger_time"] = time.Now().Format(time.RFC3339)
// Add any static params from the job configuration
maps.Copy(data, job.Params)
// Call the workflow engine to trigger the workflow
return t.engine.TriggerWorkflow(ctx, job.Workflow, job.Action, data)
})
}