-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase_partitioned.go
More file actions
601 lines (534 loc) · 22.8 KB
/
database_partitioned.go
File metadata and controls
601 lines (534 loc) · 22.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
package module
import (
"context"
"database/sql"
"fmt"
"regexp"
"strings"
"sync"
"time"
"github.com/GoCodeAlone/modular"
)
// validPartitionValue matches safe LIST partition values (alphanumeric, hyphens, underscores, dots).
var validPartitionValue = regexp.MustCompile(`^[a-zA-Z0-9_.\-]+$`)
// Partition types supported by PostgreSQL.
const (
PartitionTypeList = "list"
PartitionTypeRange = "range"
)
// PartitionKeyProvider is optionally implemented by database modules that support
// partitioning. Steps can use PartitionKey() to determine the column name
// for automatic tenant scoping, and PartitionTableName() to resolve
// tenant-specific partition table names at query time.
type PartitionKeyProvider interface {
DBProvider
PartitionKey() string
// PartitionTableName resolves the partition table name for a given parent
// table and tenant value, using the configured partitionNameFormat.
// Returns the parent table name unchanged when no format is configured.
PartitionTableName(parentTable, tenantValue string) string
}
// PartitionManager is optionally implemented by database modules that support
// runtime creation of partitions. The EnsurePartition method is idempotent —
// if the partition already exists the call succeeds without error.
type PartitionManager interface {
PartitionKeyProvider
EnsurePartition(ctx context.Context, tenantValue string) error
// SyncPartitionsFromSource queries the configured sourceTable for all
// distinct tenant values and ensures that partitions exist for each one.
// No-ops if sourceTable is not configured.
SyncPartitionsFromSource(ctx context.Context) error
}
// MultiPartitionManager extends PartitionManager for databases that can have
// more than one partition key configuration (e.g. tenant-partitioned tables
// AND api-version-partitioned tables in the same database). It is implemented
// by PartitionedDatabase; the additional methods are primarily meaningful when
// multiple partition configs are configured.
type MultiPartitionManager interface {
PartitionManager
// PartitionConfigs returns all configured partition groups.
PartitionConfigs() []PartitionConfig
// EnsurePartitionForKey creates partitions for the specified partition key
// and value on all tables that belong to that partition config. Returns an
// error if no config with that partitionKey is registered.
EnsurePartitionForKey(ctx context.Context, partitionKey, value string) error
// SyncPartitionsForKey syncs partitions for the specified partition key's
// configured source table. No-ops if no sourceTable is configured for that
// key. Returns an error if no config with that partitionKey is registered.
SyncPartitionsForKey(ctx context.Context, partitionKey string) error
}
// PartitionConfig holds per-partition-key configuration within a
// database.partitioned module. Multiple PartitionConfig entries allow a single
// module to manage tables that are partitioned by different columns or with
// different partition types.
type PartitionConfig struct {
// PartitionKey is the column name used for partitioning (e.g. tenant_id).
PartitionKey string `json:"partitionKey" yaml:"partitionKey"`
// Tables lists the tables that are partitioned by this key.
Tables []string `json:"tables" yaml:"tables"`
// PartitionType is "list" (default) or "range".
PartitionType string `json:"partitionType" yaml:"partitionType"`
// PartitionNameFormat is a template for generating partition table names.
// Supports {table} and {tenant} placeholders. Default: "{table}_{tenant}".
PartitionNameFormat string `json:"partitionNameFormat" yaml:"partitionNameFormat"`
// SourceTable is the table queried by SyncPartitionsFromSource for this key.
SourceTable string `json:"sourceTable" yaml:"sourceTable"`
// SourceColumn overrides the column queried in SourceTable. Defaults to PartitionKey.
SourceColumn string `json:"sourceColumn" yaml:"sourceColumn"`
}
// PartitionedDatabaseConfig holds configuration for the database.partitioned module.
//
// Single-partition mode (backward-compatible): set PartitionKey, Tables, and
// optionally PartitionType, PartitionNameFormat, SourceTable, SourceColumn at
// the top level.
//
// Multi-partition mode: set Partitions to a list of PartitionConfig entries.
// Each entry is an independent partition group with its own key, tables, type,
// naming format and optional source. The top-level single-partition fields are
// ignored when Partitions is non-empty.
type PartitionedDatabaseConfig struct {
Driver string `json:"driver" yaml:"driver"`
DSN string `json:"dsn" yaml:"dsn"`
MaxOpenConns int `json:"maxOpenConns" yaml:"maxOpenConns"`
MaxIdleConns int `json:"maxIdleConns" yaml:"maxIdleConns"`
// ── Single-partition fields (used when Partitions is empty) ──────────────
PartitionKey string `json:"partitionKey" yaml:"partitionKey"`
Tables []string `json:"tables" yaml:"tables"`
// PartitionType is "list" (default) or "range".
// LIST partitions are created with FOR VALUES IN ('value').
// RANGE partitions are created with FOR VALUES FROM ('value') TO ('value_next').
PartitionType string `json:"partitionType" yaml:"partitionType"`
// PartitionNameFormat is a template for generating partition table names.
// Supports {table} and {tenant} placeholders.
// Default: "{table}_{tenant}" (e.g. forms_org_alpha).
PartitionNameFormat string `json:"partitionNameFormat" yaml:"partitionNameFormat"`
// SourceTable is the table that contains all tenant IDs.
// When set, SyncPartitionsFromSource queries this table for all distinct
// values in the partition key column and ensures partitions exist.
// Example: "tenants" — will query "SELECT DISTINCT tenant_id FROM tenants".
SourceTable string `json:"sourceTable" yaml:"sourceTable"`
// SourceColumn overrides the column queried in sourceTable.
// Defaults to PartitionKey if empty.
SourceColumn string `json:"sourceColumn" yaml:"sourceColumn"`
// ── Lifecycle sync settings ───────────────────────────────────────────────
// AutoSync controls whether SyncPartitionsFromSource is called automatically
// during Start(). Defaults to true when any sourceTable is configured.
// Set to false to disable automatic sync on startup.
AutoSync *bool `json:"autoSync" yaml:"autoSync"`
// SyncInterval is a duration string (e.g. "60s", "5m") for periodic
// re-sync of partitions from the source table. When set, a background
// goroutine calls SyncPartitionsFromSource at this interval after Start().
// Requires at least one sourceTable to be configured. Example: "60s".
SyncInterval string `json:"syncInterval" yaml:"syncInterval"`
// ── Multi-partition mode ─────────────────────────────────────────────────
// Partitions lists independent partition key configurations. When non-empty,
// the single-partition fields above are ignored.
Partitions []PartitionConfig `json:"partitions" yaml:"partitions"`
}
// PartitionedDatabase wraps WorkflowDatabase and adds PostgreSQL partition
// management. It satisfies DBProvider, DBDriverProvider, PartitionKeyProvider,
// PartitionManager, and MultiPartitionManager.
type PartitionedDatabase struct {
name string
config PartitionedDatabaseConfig
partitions []PartitionConfig // normalized; always len >= 1 after construction
base *WorkflowDatabase
mu sync.RWMutex
logger modular.Logger
// periodic sync state
syncStop chan struct{}
syncWg sync.WaitGroup
}
// normalizePartitionConfig applies defaults to a PartitionConfig and returns the result.
func normalizePartitionConfig(p PartitionConfig) PartitionConfig {
if p.PartitionType == "" {
p.PartitionType = PartitionTypeList
}
if p.PartitionNameFormat == "" {
p.PartitionNameFormat = "{table}_{tenant}"
}
return p
}
// NewPartitionedDatabase creates a new PartitionedDatabase module.
//
// When cfg.Partitions is non-empty the entries are used as-is (with defaults
// applied). Otherwise a single PartitionConfig is built from the top-level
// PartitionKey / Tables / … fields for backward compatibility.
func NewPartitionedDatabase(name string, cfg PartitionedDatabaseConfig) *PartitionedDatabase {
dbConfig := DatabaseConfig{
Driver: cfg.Driver,
DSN: cfg.DSN,
MaxOpenConns: cfg.MaxOpenConns,
MaxIdleConns: cfg.MaxIdleConns,
}
var partitions []PartitionConfig
if len(cfg.Partitions) > 0 {
for _, p := range cfg.Partitions {
partitions = append(partitions, normalizePartitionConfig(p))
}
} else {
partitions = []PartitionConfig{normalizePartitionConfig(PartitionConfig{
PartitionKey: cfg.PartitionKey,
Tables: cfg.Tables,
PartitionType: cfg.PartitionType,
PartitionNameFormat: cfg.PartitionNameFormat,
SourceTable: cfg.SourceTable,
SourceColumn: cfg.SourceColumn,
})}
}
return &PartitionedDatabase{
name: name,
config: cfg,
partitions: partitions,
base: NewWorkflowDatabase(name+"._base", dbConfig),
}
}
// Name returns the module name.
func (p *PartitionedDatabase) Name() string { return p.name }
// Init registers this module as a service.
func (p *PartitionedDatabase) Init(app modular.Application) error {
p.logger = app.Logger()
return app.RegisterService(p.name, p)
}
// ProvidesServices declares the service this module provides.
func (p *PartitionedDatabase) ProvidesServices() []modular.ServiceProvider {
return []modular.ServiceProvider{
{
Name: p.name,
Description: "Partitioned Database: " + p.name,
Instance: p,
},
}
}
// RequiresServices returns no dependencies.
func (p *PartitionedDatabase) RequiresServices() []modular.ServiceDependency {
return nil
}
// Start opens the database connection during application startup. When autoSync
// is enabled (the default when any sourceTable is configured), it calls
// SyncPartitionsFromSource to create partitions for all existing tenant values.
// When syncInterval is configured, a background goroutine periodically re-syncs
// partitions at that interval.
func (p *PartitionedDatabase) Start(ctx context.Context) error {
if err := p.base.Start(ctx); err != nil {
return err
}
// Determine whether any partition config has a sourceTable.
hasSourceTable := false
for _, cfg := range p.partitions {
if cfg.SourceTable != "" {
hasSourceTable = true
break
}
}
// Auto-sync on startup: default true when sourceTable is configured.
autoSync := hasSourceTable
if p.config.AutoSync != nil {
autoSync = *p.config.AutoSync
}
if autoSync && hasSourceTable {
if err := p.SyncPartitionsFromSource(ctx); err != nil {
// DB was opened; close it to avoid leaking the connection on startup failure.
_ = p.base.Stop(ctx)
return fmt.Errorf("partitioned database %q: auto-sync on startup failed: %w", p.name, err)
}
}
// Start periodic sync goroutine if syncInterval is configured.
if p.config.SyncInterval != "" && hasSourceTable {
interval, err := time.ParseDuration(p.config.SyncInterval)
if err != nil {
// DB was opened; close it to avoid leaking the connection on startup failure.
_ = p.base.Stop(ctx)
return fmt.Errorf("partitioned database %q: invalid syncInterval %q: %w", p.name, p.config.SyncInterval, err)
}
if interval > 0 {
if p.base.DB() == nil {
// No database connection is available; starting the goroutine would
// produce repeated error logs with no useful work.
_ = p.base.Stop(ctx)
return fmt.Errorf("partitioned database %q: syncInterval requires an open database connection (is DSN configured?)", p.name)
}
p.syncStop = make(chan struct{})
p.syncWg.Add(1)
go p.runPeriodicSync(ctx, interval)
}
}
return nil
}
// runPeriodicSync runs SyncPartitionsFromSource on a ticker until stopSync is
// closed or the parent context is cancelled.
func (p *PartitionedDatabase) runPeriodicSync(ctx context.Context, interval time.Duration) {
defer p.syncWg.Done()
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-p.syncStop:
return
case <-ctx.Done():
return
case <-ticker.C:
if err := p.SyncPartitionsFromSource(ctx); err != nil {
if p.logger != nil {
p.logger.Error("partitioned database periodic sync failed",
"module", p.name, "error", err)
}
}
}
}
}
// Stop closes the database connection during application shutdown.
func (p *PartitionedDatabase) Stop(ctx context.Context) error {
if p.syncStop != nil {
close(p.syncStop)
p.syncWg.Wait()
p.syncStop = nil
}
return p.base.Stop(ctx)
}
// DB returns the underlying *sql.DB (satisfies DBProvider).
func (p *PartitionedDatabase) DB() *sql.DB {
return p.base.DB()
}
// DriverName returns the configured database driver (satisfies DBDriverProvider).
func (p *PartitionedDatabase) DriverName() string {
return p.config.Driver
}
// PartitionKey returns the column name used for partitioning (satisfies PartitionKeyProvider).
// When multiple partition configs are defined, it returns the first config's key.
func (p *PartitionedDatabase) PartitionKey() string {
if len(p.partitions) > 0 {
return p.partitions[0].PartitionKey
}
return ""
}
// PartitionType returns the partition type of the primary partition config ("list" or "range").
func (p *PartitionedDatabase) PartitionType() string {
if len(p.partitions) > 0 {
return p.partitions[0].PartitionType
}
return PartitionTypeList
}
// PartitionNameFormat returns the partition name format of the primary partition config.
func (p *PartitionedDatabase) PartitionNameFormat() string {
if len(p.partitions) > 0 {
return p.partitions[0].PartitionNameFormat
}
return "{table}_{tenant}"
}
// PartitionTableName resolves the partition table name for a given parent
// table and tenant value using the primary partition config's partitionNameFormat.
func (p *PartitionedDatabase) PartitionTableName(parentTable, tenantValue string) string {
if len(p.partitions) == 0 {
return parentTable
}
return applyPartitionNameFormat(p.partitions[0].PartitionNameFormat, parentTable, tenantValue)
}
// Tables returns the list of tables managed by the primary partition config.
func (p *PartitionedDatabase) Tables() []string {
if len(p.partitions) == 0 {
return nil
}
result := make([]string, len(p.partitions[0].Tables))
copy(result, p.partitions[0].Tables)
return result
}
// PartitionConfigs returns all configured partition groups (satisfies MultiPartitionManager).
// It returns a deep copy so callers cannot mutate the internal state.
func (p *PartitionedDatabase) PartitionConfigs() []PartitionConfig {
result := make([]PartitionConfig, len(p.partitions))
for i, cfg := range p.partitions {
result[i] = cfg
if cfg.Tables != nil {
tablesCopy := make([]string, len(cfg.Tables))
copy(tablesCopy, cfg.Tables)
result[i].Tables = tablesCopy
}
}
return result
}
// EnsurePartition creates a partition for the given value on all tables managed
// by the primary partition config. The operation is idempotent — IF NOT EXISTS
// prevents errors when the partition already exists.
//
// For LIST partitions: CREATE TABLE IF NOT EXISTS <name> PARTITION OF <table> FOR VALUES IN ('<value>')
// For RANGE partitions: CREATE TABLE IF NOT EXISTS <name> PARTITION OF <table> FOR VALUES FROM ('<value>') TO ('<value>\x00')
//
// Only PostgreSQL (pgx, pgx/v5, postgres) is supported. The method validates
// the tenant value and table/column names to prevent SQL injection.
func (p *PartitionedDatabase) EnsurePartition(ctx context.Context, tenantValue string) error {
if len(p.partitions) == 0 {
return fmt.Errorf("partitioned database %q: no partition config defined", p.name)
}
return p.ensurePartitionForConfig(ctx, p.partitions[0], tenantValue)
}
// EnsurePartitionForKey creates partitions for the specified partition key and
// value on all tables that belong to that partition config (satisfies
// MultiPartitionManager). Returns an error if no config with that partitionKey
// is registered.
func (p *PartitionedDatabase) EnsurePartitionForKey(ctx context.Context, partitionKey, value string) error {
cfg, ok := p.partitionConfigByKey(partitionKey)
if !ok {
return fmt.Errorf("partitioned database %q: no partition config found for key %q", p.name, partitionKey)
}
return p.ensurePartitionForConfig(ctx, cfg, value)
}
// ensurePartitionForConfig is the shared implementation for EnsurePartition and
// EnsurePartitionForKey. It validates inputs and executes the DDL for each table.
func (p *PartitionedDatabase) ensurePartitionForConfig(ctx context.Context, cfg PartitionConfig, tenantValue string) error {
if !validPartitionValue.MatchString(tenantValue) {
return fmt.Errorf("partitioned database %q: invalid tenant value %q (must match [a-zA-Z0-9_.\\-]+)", p.name, tenantValue)
}
if !isSupportedPartitionDriver(p.config.Driver) {
return fmt.Errorf("partitioned database %q: driver %q does not support partitioning (use pgx, pgx/v5, or postgres)", p.name, p.config.Driver)
}
if err := validateIdentifier(cfg.PartitionKey); err != nil {
return fmt.Errorf("partitioned database %q: invalid partition_key: %w", p.name, err)
}
db := p.base.DB()
if db == nil {
return fmt.Errorf("partitioned database %q: database connection is nil", p.name)
}
p.mu.Lock()
defer p.mu.Unlock()
for _, table := range cfg.Tables {
if err := validateIdentifier(table); err != nil {
return fmt.Errorf("partitioned database %q: invalid table name: %w", p.name, err)
}
partitionName := applyPartitionNameFormat(cfg.PartitionNameFormat, table, tenantValue)
// Validate the computed partition name is a safe identifier.
if err := validateIdentifier(partitionName); err != nil {
return fmt.Errorf("partitioned database %q: invalid partition name %q: %w", p.name, partitionName, err)
}
var ddl string
// We have already validated tenantValue against validPartitionValue so
// it cannot contain single-quote characters.
safeValue := strings.ReplaceAll(tenantValue, "'", "")
switch cfg.PartitionType {
case PartitionTypeList:
ddl = fmt.Sprintf(
"CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES IN ('%s')",
partitionName, table, safeValue,
)
case PartitionTypeRange:
// RANGE partition: from the tenant value (inclusive) to the same
// value followed by a null byte (exclusive). This creates a
// single-value range partition, which is the closest equivalent
// to LIST semantics for RANGE-partitioned tables.
ddl = fmt.Sprintf(
"CREATE TABLE IF NOT EXISTS %s PARTITION OF %s FOR VALUES FROM ('%s') TO ('%s\\x00')",
partitionName, table, safeValue, safeValue,
)
default:
return fmt.Errorf("partitioned database %q: unsupported partition type %q (use %q or %q)",
p.name, cfg.PartitionType, PartitionTypeList, PartitionTypeRange)
}
if _, err := db.ExecContext(ctx, ddl); err != nil {
return fmt.Errorf("partitioned database %q: failed to create partition %q for table %q: %w",
p.name, partitionName, table, err)
}
}
return nil
}
// SyncPartitionsFromSource queries the configured sourceTable for all distinct
// tenant values and ensures that partitions exist for each one. When multiple
// partition configs are defined, all configs with a sourceTable are synced.
//
// No-ops if no sourceTable is configured in any partition config.
func (p *PartitionedDatabase) SyncPartitionsFromSource(ctx context.Context) error {
for _, cfg := range p.partitions {
if err := p.syncPartitionConfigFromSource(ctx, cfg); err != nil {
return err
}
}
return nil
}
// SyncPartitionsForKey syncs partitions for the specified partition key's
// configured source table (satisfies MultiPartitionManager). No-ops if no
// sourceTable is configured for that key. Returns an error if no config with
// that partitionKey is registered.
func (p *PartitionedDatabase) SyncPartitionsForKey(ctx context.Context, partitionKey string) error {
cfg, ok := p.partitionConfigByKey(partitionKey)
if !ok {
return fmt.Errorf("partitioned database %q: no partition config found for key %q", p.name, partitionKey)
}
return p.syncPartitionConfigFromSource(ctx, cfg)
}
// syncPartitionConfigFromSource is the shared implementation for
// SyncPartitionsFromSource and SyncPartitionsForKey.
func (p *PartitionedDatabase) syncPartitionConfigFromSource(ctx context.Context, cfg PartitionConfig) error {
if cfg.SourceTable == "" {
return nil
}
if err := validateIdentifier(cfg.SourceTable); err != nil {
return fmt.Errorf("partitioned database %q: invalid source table: %w", p.name, err)
}
srcCol := cfg.SourceColumn
if srcCol == "" {
srcCol = cfg.PartitionKey
}
if err := validateIdentifier(srcCol); err != nil {
return fmt.Errorf("partitioned database %q: invalid source column: %w", p.name, err)
}
db := p.base.DB()
if db == nil {
return fmt.Errorf("partitioned database %q: database connection is nil", p.name)
}
// All identifiers (srcCol, SourceTable) have been validated by validateIdentifier above.
query := fmt.Sprintf("SELECT DISTINCT %s FROM %s WHERE %s IS NOT NULL", //nolint:gosec // G201: identifiers validated above
srcCol, cfg.SourceTable, srcCol)
rows, err := db.QueryContext(ctx, query)
if err != nil {
return fmt.Errorf("partitioned database %q: failed to query source table %q: %w",
p.name, cfg.SourceTable, err)
}
defer rows.Close()
var values []string
for rows.Next() {
var val string
if err := rows.Scan(&val); err != nil {
return fmt.Errorf("partitioned database %q: failed to scan partition value: %w", p.name, err)
}
values = append(values, val)
}
if err := rows.Err(); err != nil {
return fmt.Errorf("partitioned database %q: row iteration error: %w", p.name, err)
}
for _, val := range values {
if err := p.ensurePartitionForConfig(ctx, cfg, val); err != nil {
return err
}
}
return nil
}
// partitionConfigByKey returns the PartitionConfig for the given partition key, if any.
func (p *PartitionedDatabase) partitionConfigByKey(partitionKey string) (PartitionConfig, bool) {
for _, cfg := range p.partitions {
if cfg.PartitionKey == partitionKey {
return cfg, true
}
}
return PartitionConfig{}, false
}
// isSupportedPartitionDriver returns true for PostgreSQL-compatible drivers.
func isSupportedPartitionDriver(driver string) bool {
switch driver {
case "pgx", "pgx/v5", "postgres":
return true
}
return false
}
// sanitizePartitionSuffix converts a tenant value to a safe PostgreSQL identifier suffix.
// Hyphens and dots are replaced with underscores.
func sanitizePartitionSuffix(tenantValue string) string {
r := strings.NewReplacer("-", "_", ".", "_")
return r.Replace(tenantValue)
}
// applyPartitionNameFormat applies a partition name format template to a table
// name and tenant value. Supports {table} and {tenant} placeholders.
func applyPartitionNameFormat(format, parentTable, tenantValue string) string {
suffix := sanitizePartitionSuffix(tenantValue)
name := strings.ReplaceAll(format, "{table}", parentTable)
name = strings.ReplaceAll(name, "{tenant}", suffix)
return name
}