eskit/sqlitestore/global_reader.go
Ash 3610007deb
Some checks failed
CI / test-nats (push) Failing after 9s
CI / test-codec-otelkit (push) Successful in 15s
CI / test-integration (push) Failing after 7s
CI / test-sqlitestore (push) Failing after 39s
CI / benchmarks (push) Failing after 4s
CI / test-pgstore (push) Failing after 1m1s
CI / test-core (push) Failing after 2m16s
fix: add stream_type to Delete/Tombstone/Snapshot/AdvisoryLock + sanitize NOTIFY channel — Closes #240, Closes #241
BREAKING CHANGE: All identity-keyed interfaces now use (streamType, streamID)
composite key instead of streamID alone.

Interface changes:
- LockRegistry.Acquire(ctx, streamType, streamID)
- LockRegistry.TryAcquire(streamType, streamID)
- EventStoreWithDeletion.Delete(ctx, streamType, streamID)
- EventStoreWithDeletion.IsTombstoned(ctx, streamType, streamID)
- Snapshot[S] struct now has StreamType field
- SnapshotStore.LoadSnapshot(ctx, streamType, streamID)

All 3 backends updated: memorystore, sqlitestore, pgstore, natsstore, natslock.
Advisory lock hash now includes streamType.
Tombstone and snapshot tables keyed by (stream_type, stream_id).

NOTIFY channel name validated against ^[a-zA-Z_][a-zA-Z0-9_]*$ (TigerStyle).

56 files changed, all tests pass with -race.
2026-03-10 19:29:17 +00:00

206 lines
6.4 KiB
Go

package sqlitestore
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log/slog"
"math"
"reflect"
"time"
"git.nullsoft.is/ash/eskit"
"git.nullsoft.is/ash/eskit/codec"
)
// maxGlobalReadLimit caps the number of events per global read.
const maxGlobalReadLimit = 10000
// GlobalEvent is a store-wide event with a global sequence number.
// Compatible with subscription.GlobalEvent but decoupled to avoid import cycles.
type GlobalEvent[E any] struct {
// GlobalSequence is the store-wide monotonic position.
GlobalSequence uint64
// StreamType identifies the type/category of this stream.
StreamType string
// StreamID identifies which stream this event belongs to.
StreamID string
// EventType is the string name of the event (e.g., "OrderCreated").
EventType string
// Version is the per-stream version number.
Version int
// Data is the domain event payload.
Data E
// Timestamp is when the event was stored.
Timestamp time.Time
}
// UnknownEventBehavior controls how the reader handles event types
// that are not registered in the EventRegistry.
type UnknownEventBehavior int
const (
// SkipUnknownEvents logs a warning and skips events with unregistered types.
// This is the default, providing resilience during zero-downtime deployments.
SkipUnknownEvents UnknownEventBehavior = iota
// FailOnUnknownEvents returns an error when an unregistered event type is encountered.
FailOnUnknownEvents
)
// SQLiteGlobalReader implements subscription.GlobalReader for SQLite.
// It reads events by global sequence (the events.id column).
type SQLiteGlobalReader[E any] struct {
db *sql.DB
registry *eskit.EventRegistry
codecRegistry *codec.Registry
unknownBehavior UnknownEventBehavior
logger *slog.Logger
}
// NewSQLiteGlobalReader creates a global reader backed by a sql.DB.
func NewSQLiteGlobalReader[E any](db *sql.DB, opts ...GlobalReaderOption[E]) *SQLiteGlobalReader[E] {
if db == nil {
panic("sqlitestore: NewSQLiteGlobalReader: db must not be nil")
}
r := &SQLiteGlobalReader[E]{db: db}
for _, opt := range opts {
opt(r)
}
return r
}
// GlobalReaderOption configures a SQLiteGlobalReader.
type GlobalReaderOption[E any] func(*SQLiteGlobalReader[E])
// WithGlobalReaderRegistry enables type registry for heterogeneous global reads.
func WithGlobalReaderRegistry[E any](reg *eskit.EventRegistry) GlobalReaderOption[E] {
return func(r *SQLiteGlobalReader[E]) {
r.registry = reg
}
}
// WithGlobalReaderCodecRegistry sets the codec registry for reading events with different codecs.
func WithGlobalReaderCodecRegistry[E any](reg *codec.Registry) GlobalReaderOption[E] {
return func(r *SQLiteGlobalReader[E]) {
r.codecRegistry = reg
}
}
// WithUnknownEventBehavior controls how the reader handles event types not in the registry.
// Default is SkipUnknownEvents for resilience during rolling deployments.
func WithUnknownEventBehavior[E any](b UnknownEventBehavior) GlobalReaderOption[E] {
return func(r *SQLiteGlobalReader[E]) {
r.unknownBehavior = b
}
}
// WithGlobalReaderLogger sets the logger for the global reader.
func WithGlobalReaderLogger[E any](l *slog.Logger) GlobalReaderOption[E] {
return func(r *SQLiteGlobalReader[E]) {
r.logger = l
}
}
// ReadFrom returns events starting from the given global sequence (inclusive), up to limit.
func (r *SQLiteGlobalReader[E]) ReadFrom(ctx context.Context, fromSequence uint64, limit int) ([]GlobalEvent[E], error) {
if fromSequence > math.MaxInt64 {
return nil, nil // no events can exist past SQLite INTEGER max
}
if limit <= 0 {
limit = 100
}
if limit > maxGlobalReadLimit {
limit = maxGlobalReadLimit
}
rows, err := r.db.QueryContext(ctx,
"SELECT id, stream_type, stream_id, version, event_type, codec, event_data, timestamp FROM events WHERE id >= ? ORDER BY id LIMIT ?",
fromSequence, limit,
)
if err != nil {
return nil, fmt.Errorf("sqlitestore: global read: %w", err)
}
defer rows.Close()
var events []GlobalEvent[E]
for rows.Next() {
var e GlobalEvent[E]
var eventType, codecName, ts string
var jsonData []byte
if err := rows.Scan(&e.GlobalSequence, &e.StreamType, &e.StreamID, &e.Version, &eventType, &codecName, &jsonData, &ts); err != nil {
return nil, fmt.Errorf("sqlitestore: scan global event: %w", err)
}
e.Timestamp, _ = time.Parse(time.RFC3339Nano, ts)
e.EventType = eventType
unmarshalFn := json.Unmarshal
if codecName != "" && codecName != "json" {
reg := r.codecRegistry
if reg == nil {
reg = codec.DefaultRegistry
}
c, err := reg.Get(codecName)
if err != nil {
return nil, fmt.Errorf("sqlitestore: unknown codec %q for global event id=%d: %w", codecName, e.GlobalSequence, err)
}
unmarshalFn = c.Unmarshal
}
if r.registry != nil && eventType != "" {
instance, err := r.registry.New(eventType)
if err != nil {
if errors.Is(err, eskit.ErrEventTypeNotRegistered) && r.unknownBehavior != FailOnUnknownEvents {
l := r.logger
if l == nil {
l = slog.Default()
}
l.Warn("skipping unknown event type",
"event_type", eventType,
"global_sequence", e.GlobalSequence,
"stream_id", e.StreamID,
)
continue
}
return nil, fmt.Errorf("sqlitestore: %w", err)
}
if err := unmarshalFn(jsonData, instance); err != nil {
return nil, fmt.Errorf("sqlitestore: unmarshal global event: %w", err)
}
typed, ok := instance.(E)
if !ok {
// Try dereferencing pointer (registry returns *T, E may be T).
rv := reflect.ValueOf(instance)
if rv.Kind() == reflect.Ptr && !rv.IsNil() {
deref := rv.Elem().Interface()
typed, ok = deref.(E)
}
if !ok {
return nil, fmt.Errorf("sqlitestore: cannot convert %T to target type", instance)
}
}
e.Data = typed
} else if err := unmarshalFn(jsonData, &e.Data); err != nil {
return nil, fmt.Errorf("sqlitestore: unmarshal global event: %w", err)
}
events = append(events, e)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("sqlitestore: global read rows: %w", err)
}
return events, nil
}
// LatestSequence returns the highest global sequence in the store, or 0 if empty.
func (r *SQLiteGlobalReader[E]) LatestSequence(ctx context.Context) (uint64, error) {
var seq sql.NullInt64
err := r.db.QueryRowContext(ctx, "SELECT MAX(id) FROM events").Scan(&seq)
if err != nil {
return 0, fmt.Errorf("sqlitestore: latest sequence: %w", err)
}
if !seq.Valid {
return 0, nil
}
return uint64(seq.Int64), nil
}