-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtracker.go
More file actions
134 lines (116 loc) · 3.81 KB
/
tracker.go
File metadata and controls
134 lines (116 loc) · 3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package observability
import (
"context"
"encoding/json"
"fmt"
"io"
"time"
"github.com/GoCodeAlone/workflow/store"
"github.com/google/uuid"
)
// ExecutionTracker wraps execution lifecycle to record executions, steps, and logs.
type ExecutionTracker struct {
executions store.ExecutionStore
logs store.LogStore
}
// NewExecutionTracker creates a new ExecutionTracker.
func NewExecutionTracker(executions store.ExecutionStore, logs store.LogStore) *ExecutionTracker {
return &ExecutionTracker{
executions: executions,
logs: logs,
}
}
// StartExecution begins tracking a new workflow execution.
func (t *ExecutionTracker) StartExecution(ctx context.Context, workflowID uuid.UUID, triggerType string, data json.RawMessage) (uuid.UUID, error) {
now := time.Now()
exec := &store.WorkflowExecution{
ID: uuid.New(),
WorkflowID: workflowID,
TriggerType: triggerType,
TriggerData: data,
Status: store.ExecutionStatusRunning,
StartedAt: now,
}
if err := t.executions.CreateExecution(ctx, exec); err != nil {
return uuid.Nil, fmt.Errorf("start execution: %w", err)
}
return exec.ID, nil
}
// RecordStep records a step within an execution.
func (t *ExecutionTracker) RecordStep(ctx context.Context, executionID uuid.UUID, step *store.ExecutionStep) error {
step.ExecutionID = executionID
if step.ID == uuid.Nil {
step.ID = uuid.New()
}
return t.executions.CreateStep(ctx, step)
}
// CompleteExecution marks an execution as successfully completed.
func (t *ExecutionTracker) CompleteExecution(ctx context.Context, executionID uuid.UUID, output json.RawMessage) error {
exec, err := t.executions.GetExecution(ctx, executionID)
if err != nil {
return fmt.Errorf("get execution: %w", err)
}
now := time.Now()
durationMs := now.Sub(exec.StartedAt).Milliseconds()
exec.Status = store.ExecutionStatusCompleted
exec.OutputData = output
exec.CompletedAt = &now
exec.DurationMs = &durationMs
return t.executions.UpdateExecution(ctx, exec)
}
// FailExecution marks an execution as failed.
func (t *ExecutionTracker) FailExecution(ctx context.Context, executionID uuid.UUID, execErr error) error {
exec, err := t.executions.GetExecution(ctx, executionID)
if err != nil {
return fmt.Errorf("get execution: %w", err)
}
now := time.Now()
durationMs := now.Sub(exec.StartedAt).Milliseconds()
exec.Status = store.ExecutionStatusFailed
exec.ErrorMessage = execErr.Error()
exec.CompletedAt = &now
exec.DurationMs = &durationMs
return t.executions.UpdateExecution(ctx, exec)
}
// CancelExecution marks an execution as cancelled.
func (t *ExecutionTracker) CancelExecution(ctx context.Context, executionID uuid.UUID) error {
exec, err := t.executions.GetExecution(ctx, executionID)
if err != nil {
return fmt.Errorf("get execution: %w", err)
}
now := time.Now()
durationMs := now.Sub(exec.StartedAt).Milliseconds()
exec.Status = store.ExecutionStatusCancelled
exec.CompletedAt = &now
exec.DurationMs = &durationMs
return t.executions.UpdateExecution(ctx, exec)
}
// LogWriter returns an io.Writer that appends logs to the store at the given level.
func (t *ExecutionTracker) LogWriter(workflowID uuid.UUID, executionID uuid.UUID, level store.LogLevel) io.Writer {
return &logWriter{
logs: t.logs,
workflowID: workflowID,
executionID: executionID,
level: level,
}
}
// logWriter implements io.Writer by appending execution logs.
type logWriter struct {
logs store.LogStore
workflowID uuid.UUID
executionID uuid.UUID
level store.LogLevel
}
func (w *logWriter) Write(p []byte) (n int, err error) {
execID := w.executionID
entry := &store.ExecutionLog{
WorkflowID: w.workflowID,
ExecutionID: &execID,
Level: w.level,
Message: string(p),
}
if err := w.logs.Append(context.Background(), entry); err != nil {
return 0, err
}
return len(p), nil
}