-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpipeline_step_db_sync_partitions.go
More file actions
73 lines (61 loc) · 2.3 KB
/
pipeline_step_db_sync_partitions.go
File metadata and controls
73 lines (61 loc) · 2.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package module
import (
"context"
"fmt"
"github.com/CrisisTextLine/modular"
)
// DBSyncPartitionsStep synchronizes partitions from a source table (e.g., tenants)
// for all tables managed by a database.partitioned module. This enables automatic
// partition creation when new tenants are onboarded.
type DBSyncPartitionsStep struct {
name string
database string
partitionKey string // optional: target a specific partition config by key
app modular.Application
}
// NewDBSyncPartitionsStepFactory returns a StepFactory for DBSyncPartitionsStep.
func NewDBSyncPartitionsStepFactory() StepFactory {
return func(name string, config map[string]any, app modular.Application) (PipelineStep, error) {
database, _ := config["database"].(string)
if database == "" {
return nil, fmt.Errorf("db_sync_partitions step %q: 'database' is required", name)
}
partitionKey, _ := config["partitionKey"].(string)
return &DBSyncPartitionsStep{
name: name,
database: database,
partitionKey: partitionKey,
app: app,
}, nil
}
}
func (s *DBSyncPartitionsStep) Name() string { return s.name }
func (s *DBSyncPartitionsStep) Execute(ctx context.Context, _ *PipelineContext) (*StepResult, error) {
if s.app == nil {
return nil, fmt.Errorf("db_sync_partitions step %q: no application context", s.name)
}
svc, ok := s.app.SvcRegistry()[s.database]
if !ok {
return nil, fmt.Errorf("db_sync_partitions step %q: database service %q not found", s.name, s.database)
}
mgr, ok := svc.(PartitionManager)
if !ok {
return nil, fmt.Errorf("db_sync_partitions step %q: service %q does not implement PartitionManager (use database.partitioned)", s.name, s.database)
}
if s.partitionKey != "" {
multiMgr, ok := svc.(MultiPartitionManager)
if !ok {
return nil, fmt.Errorf("db_sync_partitions step %q: service %q does not implement MultiPartitionManager (required when partitionKey is set)", s.name, s.database)
}
if err := multiMgr.SyncPartitionsForKey(ctx, s.partitionKey); err != nil {
return nil, fmt.Errorf("db_sync_partitions step %q: %w", s.name, err)
}
} else {
if err := mgr.SyncPartitionsFromSource(ctx); err != nil {
return nil, fmt.Errorf("db_sync_partitions step %q: %w", s.name, err)
}
}
return &StepResult{Output: map[string]any{
"synced": true,
}}, nil
}