Skip to content

Commit 090bbb9

Browse files
committed
chore: wip
1 parent 660c1a5 commit 090bbb9

File tree

7 files changed

+464
-16
lines changed

7 files changed

+464
-16
lines changed

examples/distributed-locks.ts

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
import { Queue, DistributedLock } from '../src'
2+
3+
interface TaskData {
4+
id: string
5+
duration: number
6+
requiresExclusiveAccess: boolean
7+
}
8+
9+
// Create shared resources that multiple workers might access
10+
const sharedCounter = {
11+
value: 0,
12+
increment() {
13+
// Simulate a non-atomic operation that could cause race conditions
14+
const current = this.value
15+
// Simulate some processing time where race conditions could occur
16+
setTimeout(() => {}, 10)
17+
this.value = current + 1
18+
return this.value
19+
}
20+
}
21+
22+
async function main() {
23+
console.log('🔒 Distributed Locks Example')
24+
25+
// Create a task queue with distributed lock support enabled
26+
const taskQueue = new Queue<TaskData>('tasks', {
27+
verbose: true,
28+
logLevel: 'info',
29+
distributedLock: true // Enable distributed locks
30+
})
31+
32+
// Get the lock instance for manual locking operations
33+
const lock = taskQueue.getLock()
34+
35+
if (!lock) {
36+
console.error('Distributed lock not available!')
37+
return
38+
}
39+
40+
console.log('✅ Queue created with distributed lock support')
41+
42+
// Add some tasks, some requiring exclusive access
43+
const tasks = [
44+
{ id: 'task1', duration: 500, requiresExclusiveAccess: true },
45+
{ id: 'task2', duration: 300, requiresExclusiveAccess: false },
46+
{ id: 'task3', duration: 200, requiresExclusiveAccess: true },
47+
{ id: 'task4', duration: 400, requiresExclusiveAccess: false },
48+
{ id: 'task5', duration: 100, requiresExclusiveAccess: true },
49+
]
50+
51+
console.log('📝 Adding tasks...')
52+
53+
// Add all tasks to the queue
54+
for (const task of tasks) {
55+
await taskQueue.add(task, { jobId: task.id })
56+
console.log(` - Added "${task.id}" (requires lock: ${task.requiresExclusiveAccess})`)
57+
}
58+
59+
console.log('\n📊 Current job counts:')
60+
const counts = await taskQueue.getJobCounts()
61+
console.log(counts)
62+
63+
// Process tasks with multiple concurrent workers
64+
console.log('\n🔄 Processing tasks with distributed locks:')
65+
66+
// Start multiple workers to simulate a distributed system
67+
taskQueue.process(3, async (job) => {
68+
const { id, duration, requiresExclusiveAccess } = job.data
69+
70+
console.log(`⏳ Worker starting "${id}" (requires lock: ${requiresExclusiveAccess})`)
71+
72+
let result
73+
74+
if (requiresExclusiveAccess) {
75+
// For tasks requiring exclusive access, we manually acquire a lock on a shared resource
76+
const resourceName = 'shared-resource'
77+
console.log(`🔒 Acquiring lock for "${resourceName}" for task "${id}"`)
78+
79+
// Try to acquire the lock
80+
const token = await lock.acquire(resourceName, {
81+
duration: duration * 2, // Lock timeout
82+
retries: 3, // Number of retries if lock not available
83+
retryDelay: 100 // Delay between retries
84+
})
85+
86+
if (!token) {
87+
console.error(`❌ Failed to acquire lock for "${id}"`)
88+
throw new Error('Failed to acquire lock')
89+
}
90+
91+
try {
92+
console.log(`🔓 Lock acquired for "${id}" - processing with exclusive access`)
93+
94+
// Simulate processing with exclusive access to shared resource
95+
const beforeValue = sharedCounter.value
96+
await new Promise(resolve => setTimeout(resolve, duration))
97+
const afterValue = sharedCounter.increment()
98+
99+
result = {
100+
success: true,
101+
processedAt: new Date(),
102+
counterBefore: beforeValue,
103+
counterAfter: afterValue
104+
}
105+
106+
console.log(`✅ Task "${id}" completed with exclusive access, counter: ${afterValue}`)
107+
} finally {
108+
// Always release the lock when done
109+
await lock.release(resourceName, token)
110+
console.log(`🔓 Lock released for "${id}"`)
111+
}
112+
} else {
113+
// Non-exclusive tasks can run without manual locking
114+
// They're still protected from concurrent processing of the same job
115+
// by the built-in distributed job lock
116+
console.log(`⚙️ Processing "${id}" without exclusive resource access`)
117+
118+
// Simulate processing
119+
await new Promise(resolve => setTimeout(resolve, duration))
120+
121+
result = {
122+
success: true,
123+
processedAt: new Date(),
124+
noLockRequired: true
125+
}
126+
127+
console.log(`✅ Task "${id}" completed (no exclusive access needed)`)
128+
}
129+
130+
return result
131+
})
132+
133+
// Wait for all jobs to complete
134+
await new Promise(resolve => setTimeout(resolve, 5000))
135+
136+
// Check final counter value
137+
console.log(`\n📊 Final shared counter value: ${sharedCounter.value}`)
138+
139+
// Close the queue
140+
await taskQueue.close()
141+
console.log('\n👋 All tasks completed, queue closed')
142+
}
143+
144+
main().catch(console.error)

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@
6666
"example:advanced": "bun examples/advanced.ts",
6767
"example:advanced-features": "bun examples/advanced-features.ts",
6868
"example:priority-queue": "bun examples/priority-queue.ts",
69-
"example:key-rate-limiting": "bun examples/key-rate-limiting.ts"
69+
"example:key-rate-limiting": "bun examples/key-rate-limiting.ts",
70+
"example:distributed-locks": "bun examples/distributed-locks.ts"
7071
},
7172
"devDependencies": {
7273
"@stacksjs/docs": "^0.70.23",

src/distributed-lock.ts

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
import type { RedisClient } from 'bun'
2+
import { generateId } from './utils'
3+
import { createLogger } from './logger'
4+
5+
export interface LockOptions {
6+
/**
7+
* Lock duration in milliseconds (defaults to 30000 ms / 30 seconds)
8+
*/
9+
duration?: number
10+
11+
/**
12+
* Whether to automatically extend the lock as it approaches expiration (defaults to true)
13+
*/
14+
autoExtend?: boolean
15+
16+
/**
17+
* How often to extend the lock (defaults to 2/3 of the duration)
18+
*/
19+
extendInterval?: number
20+
21+
/**
22+
* Number of retries to acquire the lock if it's already locked (defaults to 0)
23+
*/
24+
retries?: number
25+
26+
/**
27+
* Delay between retries in milliseconds (defaults to 100ms)
28+
*/
29+
retryDelay?: number
30+
}
31+
32+
/**
33+
* Distributed lock implementation using Redis
34+
*/
35+
export class DistributedLock {
36+
private redisClient: RedisClient
37+
private prefix: string
38+
private readonly logger = createLogger('lock')
39+
40+
constructor(redisClient: RedisClient, prefix = 'lock') {
41+
this.redisClient = redisClient
42+
this.prefix = prefix
43+
}
44+
45+
/**
46+
* Acquire a lock
47+
* @param resource The resource to lock
48+
* @param options Lock options
49+
* @returns Lock token if successfully acquired, null otherwise
50+
*/
51+
async acquire(resource: string, options: LockOptions = {}): Promise<string | null> {
52+
const lockKey = this.getLockKey(resource)
53+
const token = generateId()
54+
const duration = options.duration || 30000 // Default 30 seconds
55+
const retries = options.retries || 0
56+
const retryDelay = options.retryDelay || 100
57+
58+
// Try to acquire the lock
59+
for (let attempt = 0; attempt <= retries; attempt++) {
60+
// Use SET NX (only set if key doesn't exist) with expiration
61+
const result = await this.redisClient.send('SET', [
62+
lockKey,
63+
token,
64+
'NX', // Only set if key doesn't exist
65+
'PX', // Set expiry in milliseconds
66+
duration.toString(),
67+
])
68+
69+
if (result === 'OK') {
70+
this.logger.debug(`Acquired lock ${resource} with token ${token}`)
71+
72+
// Set up auto-extension if enabled
73+
if (options.autoExtend !== false) {
74+
const extendInterval = options.extendInterval || Math.floor(duration * 2 / 3)
75+
this.setupAutoExtend(resource, token, duration, extendInterval)
76+
}
77+
78+
return token
79+
}
80+
81+
// If not successful and we have retries left
82+
if (attempt < retries) {
83+
await new Promise(resolve => setTimeout(resolve, retryDelay))
84+
}
85+
}
86+
87+
this.logger.debug(`Failed to acquire lock ${resource} after ${retries + 1} attempts`)
88+
return null
89+
}
90+
91+
/**
92+
* Release a lock
93+
* @param resource The resource to unlock
94+
* @param token The lock token for validation
95+
* @returns True if successfully released, false otherwise
96+
*/
97+
async release(resource: string, token: string): Promise<boolean> {
98+
const lockKey = this.getLockKey(resource)
99+
100+
// We need to implement the Lua script logic with normal Redis commands
101+
// since we can't use eval directly
102+
103+
// First, check if we own the lock
104+
const currentToken = await this.redisClient.get(lockKey)
105+
106+
if (currentToken === token) {
107+
// We own the lock, so delete it
108+
await this.redisClient.del(lockKey)
109+
this.logger.debug(`Released lock ${resource} with token ${token}`)
110+
return true
111+
} else {
112+
this.logger.debug(`Failed to release lock ${resource} with token ${token}`)
113+
return false
114+
}
115+
}
116+
117+
/**
118+
* Check if a lock exists without acquiring it
119+
* @param resource The resource to check
120+
* @returns True if locked, false otherwise
121+
*/
122+
async isLocked(resource: string): Promise<boolean> {
123+
const lockKey = this.getLockKey(resource)
124+
const result = await this.redisClient.exists(lockKey)
125+
// Different Redis clients may return different types
126+
return result ? true : false
127+
}
128+
129+
/**
130+
* Extend a lock's duration
131+
* @param resource The resource to extend
132+
* @param token The lock token for validation
133+
* @param duration New duration in milliseconds
134+
* @returns True if successfully extended, false otherwise
135+
*/
136+
async extend(resource: string, token: string, duration: number): Promise<boolean> {
137+
const lockKey = this.getLockKey(resource)
138+
139+
// Implement Lua script logic with standard Redis commands
140+
const currentToken = await this.redisClient.get(lockKey)
141+
142+
if (currentToken === token) {
143+
// We own the lock, so extend it
144+
const result = await this.redisClient.send('PEXPIRE', [lockKey, duration.toString()])
145+
146+
if (result === 1 || result === true) {
147+
this.logger.debug(`Extended lock ${resource} with token ${token} for ${duration}ms`)
148+
return true
149+
}
150+
}
151+
152+
this.logger.debug(`Failed to extend lock ${resource} with token ${token}`)
153+
return false
154+
}
155+
156+
/**
157+
* Set up automatic lock extension
158+
*/
159+
private setupAutoExtend(resource: string, token: string, duration: number, interval: number): void {
160+
const autoExtendId = setInterval(async () => {
161+
try {
162+
const extended = await this.extend(resource, token, duration)
163+
164+
if (!extended) {
165+
// Lock no longer exists or we don't own it anymore
166+
clearInterval(autoExtendId)
167+
this.logger.debug(`Stopped auto-extension for lock ${resource}`)
168+
}
169+
} catch (error) {
170+
this.logger.error(`Error extending lock ${resource}: ${(error as Error).message}`)
171+
clearInterval(autoExtendId)
172+
}
173+
}, interval)
174+
175+
// Ensure we clean up the interval when Node exits
176+
const resourceKey = `${resource}:${token}`
177+
if (typeof process !== 'undefined') {
178+
const extendTimers = DistributedLock.autoExtendTimers
179+
extendTimers.set(resourceKey, autoExtendId)
180+
181+
// Cleanup on first timer
182+
if (extendTimers.size === 1) {
183+
process.once('exit', () => {
184+
for (const timer of extendTimers.values()) {
185+
clearInterval(timer)
186+
}
187+
extendTimers.clear()
188+
})
189+
}
190+
}
191+
}
192+
193+
/**
194+
* Get the lock key with prefix
195+
*/
196+
private getLockKey(resource: string): string {
197+
return `${this.prefix}:${resource}`
198+
}
199+
200+
/**
201+
* Store interval timers for cleanup
202+
*/
203+
private static autoExtendTimers = new Map<string, NodeJS.Timeout>()
204+
205+
/**
206+
* Execute a function with a lock
207+
* @param resource The resource to lock
208+
* @param fn The function to execute while holding the lock
209+
* @param options Lock options
210+
* @returns The result of the function
211+
* @throws Error if the lock cannot be acquired
212+
*/
213+
async withLock<T>(resource: string, fn: () => Promise<T>, options: LockOptions = {}): Promise<T> {
214+
const token = await this.acquire(resource, options)
215+
216+
if (!token) {
217+
throw new Error(`Failed to acquire lock for resource ${resource}`)
218+
}
219+
220+
try {
221+
return await fn()
222+
} finally {
223+
await this.release(resource, token)
224+
}
225+
}
226+
}

src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
export { BatchProcessor } from './batch'
22
export { type CleanupOptions, CleanupService } from './cleanup'
33
export * from './config'
4+
export { DistributedLock, type LockOptions } from './distributed-lock'
45
export { JobEvents } from './events'
56
export { QueueGroup } from './group'
67
export { Job } from './job'

0 commit comments

Comments
 (0)