crdt

package
v0.39.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 31, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// SyncStep1 is sent by a connecting client: "here is my state vector".
	SyncStep1 = "sync_step1"
	// SyncStep2 is the server response: "here are ops you are missing".
	SyncStep2 = "sync_step2"
	// SyncUpdate is an incremental update (ops broadcast).
	SyncUpdate = "sync_update"
)

SyncMessage types for the state-vector based sync protocol.

Variables

This section is empty.

Functions

This section is empty.

Types

type Document

type Document struct {
	// contains filtered or unexported fields
}

Document is a container for multiple named CRDT fields, representing a collaborative document.

func Decode

func Decode(data []byte, nodeID NodeID) (*Document, error)

Decode deserializes a document from bytes.

func NewDocument

func NewDocument(id string, nodeID NodeID) *Document

NewDocument creates a new Document with the given ID and owning nodeID.

func (*Document) Diff

func (d *Document) Diff(since StateVersion) []Operation

Diff returns operations representing changes since the given state version. Currently operates at text-field granularity using RGA state vectors.

func (*Document) Encode

func (d *Document) Encode() ([]byte, error)

Encode serializes the document to bytes using gob encoding.

func (*Document) GetCounter

func (d *Document) GetCounter(field string) *PNCounter

GetCounter returns the PNCounter field with the given name, creating it if needed.

func (*Document) GetMVRegister

func (d *Document) GetMVRegister(field string) *MVRegister

GetMVRegister returns the MVRegister field with the given name, creating it if needed.

func (*Document) GetRegister

func (d *Document) GetRegister(field string) *LWWRegister

GetRegister returns the LWWRegister field with the given name, creating it if needed.

func (*Document) GetSet

func (d *Document) GetSet(field string) *ORSet

GetSet returns the ORSet field with the given name, creating it if needed.

func (*Document) GetText

func (d *Document) GetText(field string) *RGA

GetText returns the RGA text field with the given name, creating it if needed.

func (*Document) ID

func (d *Document) ID() string

ID returns the document identifier.

func (*Document) Merge

func (d *Document) Merge(other *Document)

Merge merges a remote Document into this one.

func (*Document) Version

func (d *Document) Version() StateVersion

Version returns a computed state version that merges the document's own version with state vectors from all text fields (RGAs).

type DocumentSnapshot

type DocumentSnapshot struct {
	ID       string                         `json:"id"`
	Version  StateVersion                   `json:"version"`
	Texts    map[string]*textSnapshot       `json:"texts,omitempty"`
	Counters map[string]*counterSnapshot    `json:"counters,omitempty"`
	Sets     map[string]*setSnapshot        `json:"sets,omitempty"`
	Regs     map[string]*registerSnapshot   `json:"registers,omitempty"`
	MVRegs   map[string]*mvRegisterSnapshot `json:"mvRegisters,omitempty"`
}

DocumentSnapshot is the serializable state of a Document.

type FieldType

type FieldType uint8

FieldType identifies the CRDT type of a document field.

const (
	FieldTypeText       FieldType = iota // RGA
	FieldTypeCounter                     // PNCounter
	FieldTypeSet                         // ORSet
	FieldTypeRegister                    // LWWRegister
	FieldTypeMVRegister                  // MVRegister
)

type GCounter

type GCounter struct {
	// contains filtered or unexported fields
}

GCounter is a grow-only counter CRDT. Each node maintains its own count; the total is the sum of all counts.

func NewGCounter

func NewGCounter() *GCounter

NewGCounter returns a new empty GCounter.

func (*GCounter) Increment

func (g *GCounter) Increment(nodeID NodeID, delta uint64)

Increment adds delta to the count for nodeID.

func (*GCounter) Merge

func (g *GCounter) Merge(other *GCounter)

Merge merges a remote GCounter into this one by taking the max for each node.

func (*GCounter) State

func (g *GCounter) State() map[NodeID]uint64

State returns a copy of the internal state map.

func (*GCounter) Value

func (g *GCounter) Value() uint64

Value returns the total count across all nodes.

type LWWRegister

type LWWRegister struct {
	// contains filtered or unexported fields
}

LWWRegister is a register that resolves concurrent writes by timestamp. The most recent write (highest timestamp) wins.

func NewLWWRegister

func NewLWWRegister() *LWWRegister

NewLWWRegister returns a new empty LWWRegister.

func (*LWWRegister) Get

func (r *LWWRegister) Get() (any, Timestamp)

Get returns the current value and its timestamp.

func (*LWWRegister) Merge

func (r *LWWRegister) Merge(other *LWWRegister)

Merge merges a remote register into this one.

func (*LWWRegister) Set

func (r *LWWRegister) Set(value any, ts Timestamp)

Set updates the register value if the given timestamp is newer.

type MVEntry

type MVEntry struct {
	Value     any       `json:"value"`
	Timestamp Timestamp `json:"timestamp"`
}

MVEntry is a single versioned value in an MVRegister.

type MVRegister

type MVRegister struct {
	// contains filtered or unexported fields
}

MVRegister preserves all concurrent writes rather than picking one winner. It maintains a set of (value, timestamp) pairs. When a new value is set, it replaces all entries that are causally dominated.

func NewMVRegister

func NewMVRegister() *MVRegister

NewMVRegister returns a new empty MVRegister.

func (*MVRegister) Get

func (r *MVRegister) Get() []MVEntry

Get returns all concurrent values.

func (*MVRegister) Merge

func (r *MVRegister) Merge(other *MVRegister)

Merge merges a remote MVRegister. The result is the union of non-dominated entries.

func (*MVRegister) Set

func (r *MVRegister) Set(value any, ts Timestamp)

Set adds a new value, removing all entries whose timestamp is dominated by ts.

type NodeID

type NodeID = string

NodeID identifies a unique node/client in the distributed system.

type ORSet

type ORSet struct {
	// contains filtered or unexported fields
}

ORSet is an observed-remove set CRDT. Elements can be added and removed without conflicts. Concurrent add + remove: the add wins (add-wins semantics).

func NewORSet

func NewORSet(nodeID NodeID) *ORSet

NewORSet returns a new ORSet for the given node.

func (*ORSet) Add

func (s *ORSet) Add(key string, value any) string

Add adds an element to the set. Returns the generated unique tag.

func (*ORSet) Contains

func (s *ORSet) Contains(key string) bool

Contains checks if key is present in the set.

func (*ORSet) Elements

func (s *ORSet) Elements() map[string]any

Elements returns all keys currently in the set with one representative value each.

func (*ORSet) Merge

func (s *ORSet) Merge(other *ORSet)

Merge merges a remote ORSet into this one (union of tags).

func (*ORSet) RawState

func (s *ORSet) RawState() map[string]map[string]any

RawState returns a deep copy of internal tag state for serialization.

func (*ORSet) Remove

func (s *ORSet) Remove(key string)

Remove removes an element by key, removing all observed tags.

type ORSetElement

type ORSetElement struct {
	Value any
}

ORSetElement tracks the unique tags for an element. An element is in the set if it has at least one tag not in the tombstone set.

type OpType

type OpType uint8

OpType identifies an RGA operation kind.

const (
	OpInsert OpType = iota
	OpDelete
)

type Operation

type Operation struct {
	Field     string    `json:"field"`
	FieldType FieldType `json:"fieldType"`
	Data      []byte    `json:"data"`
}

Operation represents a serializable CRDT operation for sync.

type PNCounter

type PNCounter struct {
	// contains filtered or unexported fields
}

PNCounter is a counter that supports both increment and decrement by composing two GCounters.

func NewPNCounter

func NewPNCounter() *PNCounter

NewPNCounter returns a new PNCounter.

func (*PNCounter) Decrement

func (pn *PNCounter) Decrement(nodeID NodeID, delta uint64)

Decrement adds delta to the negative counter for nodeID.

func (*PNCounter) Increment

func (pn *PNCounter) Increment(nodeID NodeID, delta uint64)

Increment adds delta to the positive counter for nodeID.

func (*PNCounter) Merge

func (pn *PNCounter) Merge(other *PNCounter)

Merge merges a remote PNCounter into this one.

func (*PNCounter) Value

func (pn *PNCounter) Value() int64

Value returns positive - negative as a signed integer.

type RGA

type RGA struct {
	// contains filtered or unexported fields
}

RGA (Replicated Growable Array) is a CRDT for collaborative text editing. It maintains a linked list of character nodes with unique IDs that allow concurrent inserts to be ordered deterministically.

func NewRGA

func NewRGA(nodeID NodeID) *RGA

NewRGA creates a new RGA instance for the given node.

func (*RGA) ApplyOp

func (r *RGA) ApplyOp(op RGAOp) error

ApplyOp applies a remote operation. Returns an error if the operation references a parent that doesn't exist (operation should be retried later).

func (*RGA) Delete

func (r *RGA) Delete(position int) (RGAOp, error)

Delete deletes the character at the given visible position (0-based). Returns the generated operation for replication.

func (*RGA) Insert

func (r *RGA) Insert(position int, ch rune) RGAOp

Insert inserts a character after the given position (0-based index). Position -1 means insert at the beginning. Returns the generated operation for replication.

func (*RGA) InsertText

func (r *RGA) InsertText(position int, text string) []RGAOp

InsertText inserts a string starting at the given position. Returns the operations generated.

func (*RGA) Length

func (r *RGA) Length() int

Length returns the number of visible characters.

func (*RGA) Merge

func (r *RGA) Merge(other *RGA) error

Merge applies all operations from a remote RGA. This is a convenience method that replays operations.

func (*RGA) Operations

func (r *RGA) Operations() []RGAOp

Operations returns pending operations and clears the pending buffer.

func (*RGA) OpsSince

func (r *RGA) OpsSince(sv map[NodeID]uint64) []RGAOp

OpsSince returns all operations for nodes whose sequence is greater than the values in the provided state vector.

func (*RGA) StateVector

func (r *RGA) StateVector() map[NodeID]uint64

StateVector returns a map of nodeID -> max sequence seen, used for sync.

func (*RGA) ToString

func (r *RGA) ToString() string

ToString returns the current visible text content.

type RGAID

type RGAID struct {
	Seq    uint64 `json:"seq"`
	NodeID NodeID `json:"nodeId"`
}

RGAID uniquely identifies a character node in the RGA.

func (RGAID) After

func (id RGAID) After(other RGAID) bool

After reports whether id should be ordered after other (for tie-breaking).

func (RGAID) IsZero

func (id RGAID) IsZero() bool

IsZero reports whether the ID is the zero/sentinel value.

func (RGAID) String

func (id RGAID) String() string

String returns a human-readable ID.

type RGAOp

type RGAOp struct {
	Type      OpType    `json:"type"`
	ID        RGAID     `json:"id"`
	ParentID  RGAID     `json:"parentId"` // for insert: the ID of the node this is inserted after
	Char      rune      `json:"char"`
	Timestamp Timestamp `json:"timestamp"`
}

RGAOp represents a single RGA operation for replication.

type StateVersion

type StateVersion map[NodeID]uint64

StateVersion represents a document's version as a state vector. Maps nodeID -> max sequence number seen from that node.

func (StateVersion) Dominates

func (v StateVersion) Dominates(other StateVersion) bool

Dominates reports whether v causally dominates other (every entry in other is <= the corresponding entry in v).

func (StateVersion) Merge

func (v StateVersion) Merge(other StateVersion) StateVersion

Merge returns a new StateVersion taking the max of each entry.

type SyncBroadcastFunc

type SyncBroadcastFunc func(docID string, excludeClient string, msg []byte)

SyncBroadcastFunc is called when operations need to be broadcast to other clients.

type SyncManager

type SyncManager struct {
	// contains filtered or unexported fields
}

SyncManager handles CRDT document synchronization across clients.

func NewSyncManager

func NewSyncManager(broadcast SyncBroadcastFunc) *SyncManager

NewSyncManager creates a new SyncManager. The broadcast function is called whenever operations should be sent to other clients.

func (*SyncManager) BroadcastOps

func (sm *SyncManager) BroadcastOps(docID string, ops []Operation)

BroadcastOps broadcasts operations to all clients connected to a document.

func (*SyncManager) Documents

func (sm *SyncManager) Documents() []string

Documents returns a list of all registered document IDs.

func (*SyncManager) GetDocument

func (sm *SyncManager) GetDocument(id string) *Document

GetDocument returns a registered document by ID.

func (*SyncManager) GetOrCreateDocument

func (sm *SyncManager) GetOrCreateDocument(id string, nodeID NodeID) *Document

GetOrCreateDocument returns an existing document or creates a new one.

func (*SyncManager) HandleSync

func (sm *SyncManager) HandleSync(clientID string, raw []byte) ([]byte, error)

HandleSync processes an incoming sync message from a client and returns a response message (if any). This implements the state-vector sync protocol:

  1. Client sends SyncStep1 with its state vector
  2. Server responds with SyncStep2 containing ops the client is missing
  3. Server also sends the server's state vector so the client can respond with SyncStep2
  4. Incremental updates are broadcast as SyncUpdate

func (*SyncManager) RegisterDocument

func (sm *SyncManager) RegisterDocument(id string, doc *Document)

RegisterDocument adds a document to the sync manager.

func (*SyncManager) UnregisterDocument

func (sm *SyncManager) UnregisterDocument(id string)

UnregisterDocument removes a document from the sync manager.

type SyncMessage

type SyncMessage struct {
	Type        string       `json:"type"`
	DocID       string       `json:"docId"`
	ClientID    string       `json:"clientId,omitempty"`
	StateVector StateVersion `json:"stateVector,omitempty"`
	Ops         []Operation  `json:"ops,omitempty"`
}

SyncMessage is the wire format for CRDT sync messages.

type Timestamp

type Timestamp struct {
	Time   int64  `json:"time"`
	NodeID NodeID `json:"nodeId"`
}

Timestamp is a Lamport-style logical clock for ordering operations.

func (Timestamp) After

func (t Timestamp) After(other Timestamp) bool

After reports whether t is causally after other. Ties are broken by NodeID lexicographic order.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL