Skip to content

Semaphore locking library, can be used to limit goroutine dynamically

Notifications You must be signed in to change notification settings

kokizzu/semlock

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

SemLock

a simple semaphore lock, difference between built-in sync/semaphore: the limit can be updated dynamically, difference with worker pool libraries: this package can be used to increase or decrease number of goroutine running at the same time in after initialization.

the use case for this library to dynamically limit the number of concurrent tasks adaptively

eg. when database overload we want to decrease number of query hitting database for example based on latency or when CPU/RAM/Bandwidth overloaded, we may want to decrease the number of worker and increase it back when no longer overloaded.

How it works

  • allowed is number of available locks, can be manipulated by calling IncAllowed() or DecAllowed()
  • active is number of locks that releaseed to the worker, can be manipulated by calling BlockUntilAllowed() (active+1) or ReleaseActive() (active-1)
  • min (default 1) and max is the minimum and maximum threshold for allowed, used to make sure it wont overflow (too many worker, eg. when allowed increased too much) or underflow (no more worker can be activated, eg. when allowed < 1).

the SemaphoreLock struct will ensure min <= allowed <= max, and active <= allowed

if active >= allowed (rate limit exceeded), BlockUntilAllowed() (acquire lock) will block indefinitely until locks available (active < allowed)

lock can be released (decreasing number of active) by calling ReleaseActive()

number of available locks (number of allowed) can be increased or decreased by calling IncAllowed() or DecAllowed()

Example: MinSemaphoreLock, L=lock acquired/active, A=available lock/allowed

   min=1 max=3 available=1
   [A] [ ] [ ]

   thread1: BlockUntilAllowed() // will pass
   [L] [ ] [ ]

   thread2: BlockUntilAllowed() // will block
   [L] [ ] [ ]

   thread3: IncAllowed()
   [L] [A] [ ]
   thread2 continued
   [L] [L] [ ]

   thread3: IncAllowed()
   [L] [L] [A]

   thread1: ReleaseActive()
   [L] [A] [A]

   thread2: releaseActive()
   [A] [A] [A]

Usage Example

package main

import "github.com/kokizzu/semlock"

func _() {
    // MinSemaphoreLock will start with allowed=1
    // MaxSemaphoreLock will start with allowed=max
    // maximum 10 concurrent tasks, 100ms delay before trying to acquire lock again
    s := semlock.NewMinSemaphoreLock(10, 100 * time.Millisecond)
    
    for range 10 { // maximum 10 worker
        go func() {
            for item := range someChannel {
                // block until acquire lock (active+1)
                s.BlockUntilAllowed() // will block if active >= allowed
                
                expensiveQueryOrCalculation(item)
                
                // release lock (active-1)
                s.ReleaseActive() 
            }
        }
    }
    
    for {
        select {
            case <- cpuOverloaded, <- databaseOverloadedSignal:
                s.DecAllowed() // reduce number of goroutine allowed to progress
            case <- cpuLessHalfIdle, <- databaseLessHalfIdleSignal:
                s.IncAllowed() // increase number of allowed goroutine progressing
            case <- ctx.Done():
                close(someChannel)
                return
        }
    }
}

TODO

  • replace WaitDelay with channel so doesn't have to do polling, but if we do that, max limit can be no longer modified after initialization unless we use infinite channel.

About

Semaphore locking library, can be used to limit goroutine dynamically

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages