Skip to content

A high performance and convenient thread safe queue that can concurrently grow and shrink with push, extend, pop and pull capabilities.

License

Apache-2.0, MIT licenses found

Licenses found

Apache-2.0
LICENSE-APACHE
MIT
LICENSE-MIT
Notifications You must be signed in to change notification settings

orxfun/orx-concurrent-queue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

204 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

orx-concurrent-queue

orx-concurrent-queue crate orx-concurrent-queue crate orx-concurrent-queue documentation

A high performance and convenient thread safe queue that can concurrently grow and shrink with push, extend, pop and pull capabilities.

Examples

The following example demonstrates a basic usage of ConcurrentQueue within a synchronous program. Note that push, extend, pop and pull methods can be called with a shared reference &self. This allows to use the queue conveniently in a concurrent program.

use orx_concurrent_queue::ConcurrentQueue;

let queue = ConcurrentQueue::new();

queue.push(0); // [0]
queue.push(1); // [0, 1]

let x = queue.pop(); // [1]
assert_eq!(x, Some(0));

queue.extend(2..7); // [1, 2, 3, 4, 5, 6]

let x: Vec<_> = queue.pull(4).unwrap().collect(); // [5, 6]
assert_eq!(x, vec![1, 2, 3, 4]);

assert_eq!(queue.len(), 2);

let vec = queue.into_inner();
assert_eq!(vec, vec![5, 6]);

The following example demonstrates the main purpose of the concurrent queue: to simultaneously push to and pop from the queue. This enables a parallel program where tasks can be handled by multiple threads, while at the same time, new tasks can be created and dynamically added to the queue.

In the following example, the queue is created with three pre-populated tasks. Every task might potentially lead to new tasks. These new tasks are also added to the back of the queue, to be popped later and potentially add new tasks to the queue.

use orx_concurrent_queue::ConcurrentQueue;
use std::sync::atomic::{AtomicUsize, Ordering};

struct Task {
    micros: usize,
}

impl Task {
    fn perform(&self) {
        use std::{thread::sleep, time::Duration};
        sleep(Duration::from_micros(self.micros as u64));
    }
    fn child_tasks(&self) -> impl ExactSizeIterator<Item = Task> {
        let range = match self.micros < 5 {
            true => 0..0,
            false => 0..self.micros,
        };
        range.rev().take(5).map(|micros| Self { micros })
    }
}

let queue = ConcurrentQueue::new();

// pre-populate with 3 tasks
for micros in [10, 15, 10] {
    queue.push(Task { micros });
}

// count total number of performed tasks
let num_performed_tasks = AtomicUsize::new(queue.len());

let num_threads = 8;
std::thread::scope(|s| {
    for _ in 0..num_threads {
        s.spawn(|| {
            // keep popping a task from front of the queue
            // as long as the queue is not empty
            while let Some(task) = queue.pop() {
                // create children tasks, add to back
                queue.extend(task.child_tasks());

                // perform the popped task
                task.perform();

                _ = num_performed_tasks.fetch_add(1, Ordering::Relaxed);
            }
        });
    }
});

assert_eq!(num_performed_tasks.load(Ordering::Relaxed), 5046);

Contributing

Contributions are welcome! If you notice an error, have a question or think something could be improved, please open an issue or create a PR.

License

Dual-licensed under Apache 2.0 or MIT.

About

A high performance and convenient thread safe queue that can concurrently grow and shrink with push, extend, pop and pull capabilities.

Resources

License

Apache-2.0, MIT licenses found

Licenses found

Apache-2.0
LICENSE-APACHE
MIT
LICENSE-MIT

Stars

Watchers

Forks

Sponsor this project

 

Packages

No packages published

Languages