33 stable releases (3 major)
| new 3.1.1 | Feb 14, 2026 |
|---|---|
| 3.0.3 | Jan 30, 2026 |
| 2.1.10 | Jan 10, 2026 |
| 2.1.9 | Dec 31, 2025 |
| 0.1.3 | Jun 29, 2020 |
#19 in Concurrency
28,087 downloads per month
Used in 67 crates
(28 directly)
435KB
9K
SLoC
Crossfire
High-performance lockless spsc/mpsc/mpmc channels, algorithm derives crossbeam with improvements.
It supports async contexts and bridges the gap between async and blocking contexts.
For the concept, please refer to the wiki.
Version history
-
v1.0: Used in production since 2022.12.
-
v2.0: [2025.6] Refactored the codebase and API by removing generic types from the ChannelShared type, which made it easier to code with.
-
v2.1: [2025.9] Removed the dependency on crossbeam-channel and implemented with a modified version of crossbeam-queue, brings 2x performance improvements for both async and blocking contexts.
-
v3.0: [2026.1] Refactored API back to generic flavor interface, added select. Dedicated optimization: Bounded SPSC +70%, MPSC +30%, one-size +20%. Eliminate enum dispatch cost, async performance improved for another 33%. Checkout compat for migration from v2.x.
Performance
Being a lockless channel, crossfire outperforms other async-capable channels. And thanks to a lighter notification mechanism, in a blocking context, most cases are even better than the original crossbeam-channel,
More benchmark data is posted on wiki.
Also, being a lockless channel, the algorithm relies on spinning and yielding. Spinning is good on
multi-core systems, but not friendly to single-core systems (like virtual machines).
So we provide a function detect_backoff_cfg() to detect the running platform.
Calling it within the initialization section of your code, will get a 2x performance boost on
VPS.
The benchmark is written in the criterion framework. You can run the benchmark by:
make bench crossfire
make bench crossfire_select
APIs
Concurrency Modules
-
spsc, mpsc, mpmc. Each has different underlying implementation optimized to its concurrent model. The SP or SC interface is only for non-concurrent operation. It's more memory-efficient in waker registration, and has atomic ops cost reduced in the lockless algorithm.
-
oneshot has its special sender/receiver type because using
Tx/Rxwill be too heavy. -
- Select<'a>: crossbeam-channel style type erased API, borrows receiver address and select with "token"
- Multiplex: Multiplex stream that owns multiple receiver, select from the same type of channel flavors, for the same type of message.
-
waitgroup High performance WaitGroup that allows custom threshold.
Flavors
The following lockless queues are expose in flavor module, and each one have type alias in spsc/mpsc/mpmc:
List(which use crossbeamSegQueue)Array(which is an enum that wraps crossbeamArrayQueue, and aOneif init with size<=1)- For a bounded channel, a 0 size case is not supported yet. (rewrite as 1 size).
- The implementation for spsc & mpsc is simplified from mpmc version.
One(which derives fromArrayQueuealgorithm, but have better performance in size=1 scenario, because it have two slots to allow reader and writer works concurrently)Null(See the doc null), for cancellation purpose channel, that only wakeup on closing.
NOTE :
Although the name Array, List are the same between spsc/mpsc/mpmc module,
they are different type alias local to its parent module. We suggest distinguish by
namespace when import for use.
Channel builder function
Aside from function bounded_*, unbounded_* which specify the sender / receiver type,
each module has build() and new() function, which can apply to any channel flavors, and any async/blocking combinations.
Types
| Context | Sender (Producer) | Receiver (Consumer) | ||
|---|---|---|---|---|
| Single | Multiple | Single | Multiple | |
| Blocking | BlockingTxTrait | BlockingRxTrait | ||
| Tx | MTx | Rx | MRx | |
| Async | AsyncTxTrait | AsyncRxTrait | ||
| AsyncTx | MAsyncTx | AsyncRx | MAsyncRx | |
Safety: For the SP / SC version, AsyncTx, AsyncRx, Tx, and Rx are not Clone and without Sync.
Although can be moved to other threads, but not allowed to use send/recv while in an Arc. (Refer to the compile_fail
examples in the type document).
The benefit of using the SP / SC API is completely lockless waker registration, in exchange for a performance boost.
The sender/receiver can use the From trait to convert between blocking and async context
counterparts (refer to the example below)
Error types
Error types are the same as crossbeam-channel:
TrySendError, SendError, SendTimeoutError, TryRecvError, RecvError, RecvTimeoutError
Async compatibility
Tested on tokio-1.x and async-std-1.x, crossfire is runtime-agnostic.
The following scenarios are considered:
-
The
AsyncTx::send()andAsyncRx::recv()operations are cancellation-safe in an async context. You can safely use the select! macro and timeout() function in tokio/futures in combination with recv(). On cancellation,SendFutureandRecvFuturewill trigger drop(), which will clean up the state of the waker, making sure there is no memory-leak and deadlock. But you cannot know the true result from SendFuture, since it's dropped upon cancellation. Thus, we suggest usingAsyncTx::send_timeout()instead. -
When the "tokio" or "async_std" feature is enabled, we also provide two additional functions:
-
AsyncTx::send_timeout(), which will return the message that failed to be sent inSendTimeoutError. We guarantee the result is atomic. Alternatively, you can useAsyncTx::send_with_timer(). -
AsyncRx::recv_timeout(), we guarantee the result is atomic. Alternatively, you can useAsyncRx::recv_with_timer().
- The waker footprint:
When using a multi-producer and multi-consumer scenario, there's a small memory overhead to pass along a Weak
reference of wakers.
Because we aim to be lockless, when the sending/receiving futures are canceled (like tokio::time::timeout()),
it might trigger an immediate cleanup if the try-lock is successful, otherwise will rely on lazy cleanup.
(This won't be an issue because weak wakers will be consumed by actual message send and recv).
On an idle-select scenario, like a notification for close, the waker will be reused as much as possible
if poll() returns pending.
- Handle written future:
The future object created by AsyncTx::send(), AsyncTx::send_timeout(), AsyncRx::recv(),
AsyncRx::recv_timeout() is Sized. You don't need to put them in Box.
If you like to use poll function directly for complex behavior, you can call
AsyncSink::poll_send() or AsyncStream::poll_item() with Context.
Usage
Cargo.toml:
[dependencies]
crossfire = "3.0"
Feature flags
-
compat: Enable the compat model, which has the same API namespace struct as V2.x -
tokio: Enablesend_timeout(),recv_timeout()with tokio sleep function. (conflict withasync_stdfeature) -
async_std: Enable send_timeout, recv_timeout with async-std sleep function. (conflict withtokiofeature) -
trace_log: Development mode, to enable internal log while testing or benchmark, to debug deadlock issues.
Example
blocking / async sender receiver mixed together
extern crate crossfire;
use crossfire::*;
#[macro_use]
extern crate tokio;
use tokio::time::{sleep, interval, Duration};
#[tokio::main]
async fn main() {
let (tx, rx) = mpmc::bounded_async::<usize>(100);
let mut recv_counter = 0;
let mut co_tx = Vec::new();
let mut co_rx = Vec::new();
const ROUND: usize = 1000;
let _tx: MTx<mpmc::Array<usize>> = tx.clone().into_blocking();
co_tx.push(tokio::task::spawn_blocking(move || {
for i in 0..ROUND {
_tx.send(i).expect("send ok");
}
}));
co_tx.push(tokio::spawn(async move {
for i in 0..ROUND {
tx.send(i).await.expect("send ok");
}
}));
let _rx: MRx<mpmc::Array<usize>> = rx.clone().into_blocking();
co_rx.push(tokio::task::spawn_blocking(move || {
let mut count: usize = 0;
'A: loop {
match _rx.recv() {
Ok(_i) => {
count += 1;
}
Err(_) => break 'A,
}
}
count
}));
co_rx.push(tokio::spawn(async move {
let mut count: usize = 0;
'A: loop {
match rx.recv().await {
Ok(_i) => {
count += 1;
}
Err(_) => break 'A,
}
}
count
}));
for th in co_tx {
let _ = th.await.unwrap();
}
for th in co_rx {
recv_counter += th.await.unwrap();
}
assert_eq!(recv_counter, ROUND * 2);
}
Test status
NOTE: Because we has push the speed to a level no one has gone before, it can put a pure pressure to the async runtime. Some hidden bug (especially atomic ops on weaker ordering platform) might occur:
The test is placed in test-suite directory, run with:
make test
| arch | runtime | workflow | status |
|---|---|---|---|
| x86_64 | threaded | cron_master_threaded_x86 | STABLE |
| tokio 1.47.1 | cron_master_tokio_x86 | STABLE |
|
| async-std | cron_master_async_std_x86 | STABLE | |
| smol | cron_master_smol-x86 | STABLE | |
| compio | cron_master_compio-x86 | verifying | |
| arm | threaded |
cron_master_threaded_arm |
STABLE |
| tokio >= 1.48 (tokio PR #7622) |
cron_master_tokio_arm |
SHOULD UPGRADE tokio to 1.48 STABLE |
|
| async-std | cron_master_async_std_arm | STABLE | |
| smol | cron_master_smol_arm | STABLE | |
| compio | cron_master_compio_arm | verifying | |
| miri (emulation) | threaded | miri_tokio miri_tokio_cur |
STABLE |
| tokio | STABLE | ||
| async-std | - | (timerfd_create) not supported by miri | |
| smol | - | (timerfd_create) not supported by miri |
Debugging deadlock issue
Debug locally:
Use --features trace_log to run the bench or test until it hangs, then press ctrl+c or send SIGINT, there will be latest log dump to /tmp/crossfire_ring.log (refer to tests/common.rs _setup_log())
Debug with github workflow: https://github.com/frostyplanet/crossfire-rs/issues/37
Dependencies
~0.4–6MB
~133K SLoC