Skip to main content

Crate crossfire

Crate crossfire 

Source
Expand description

§Crossfire

High-performance lockless spsc/mpsc/mpmc channels, algorithm derives crossbeam with improvements.

It supports async contexts and bridges the gap between async and blocking contexts.

For the concept, please refer to the wiki.

§Version history

  • v1.0: Used in production since 2022.12.

  • v2.0: [2025.6] Refactored the codebase and API by removing generic types from the ChannelShared type, which made it easier to code with.

  • v2.1: [2025.9] Removed the dependency on crossbeam-channel and implemented with a modified version of crossbeam-queue, brings 2x performance improvements for both async and blocking contexts.

  • v3.0: [2026.1] Refactored API back to generic flavor interface, added select. Dedicated optimization: Bounded SPSC +70%, MPSC +30%, one-size +20%. Eliminate enum dispatch cost, async performance improved for another 33%. Checkout compat for migiration from v2.x.

§Test status

Refer to the README page for known issue on specified platform and runtime.

§Performance

Being a lockless channel, crossfire outperforms other async-capable channels. And thanks to a lighter notification mechanism, most cases in blocking context are even better than the original crossbeam-channel,

benchmark data is posted on wiki.

Also, being a lockless channel, the algorithm relies on spinning and yielding. Spinning is good on multi-core systems, but not friendly to single-core systems (like virtual machines). So we provide a function detect_backoff_cfg() to detect the running platform. Calling it within the initialization section of your code, will get a 2x performance boost on VPS.

The benchmark is written in the criterion framework. You can run the benchmark by:

make bench crossfire
make bench crossfire_select

§APIs

§Concurrency Modules

  • spsc, mpsc, mpmc. Each has different underlying implementation optimized to its concurrent model. The SP or SC interface is only for non-concurrent operation. It’s more memory-efficient in waker registration, and has atomic ops cost reduced in the lockless algorithm.

  • oneshot has its special sender/receiver type because using Tx / Rx will be too heavy.

  • select:

    • Select<’a>: crossbeam-channel style type erased API, borrows receiver address and select with “token”
    • Multiplex: Multiplex stream that owns multiple receiver, select from the same type of channel flavors, for the same type of message.
  • waitgroup: High performance WaitGroup which allows custom threshold

§Flavors

The following lockless queues are expose in flavor module, and each one have type alias in spsc/mpsc/mpmc:

  • List (which use crossbeam SegQueue)
  • Array (which is an enum that wraps crossbeam ArrayQueue, and a One if init with size<=1)
    • For a bounded channel, a 0 size case is not supported yet. (rewrite as 1 size).
    • The implementation for spsc & mpsc is simplified from mpmc version.
  • One (which derives from ArrayQueue algorithm, but have better performance in size=1 scenario, because it have two slots to allow reader and writer works concurrently)
  • Null (See the doc crate::null), for cancellation purpose channel, that only wakeup on closing.

NOTE : Although the name Array, List are the same between spsc/mpsc/mpmc module, they are different type alias local to its parent module. We suggest distinguish by namespace when import for use.

§Channel builder function

Aside from function bounded_*, unbounded_* which specify the sender / receiver type, each module has build() and new() function, which can apply to any channel flavors, and any async/blocking combinations.

§Types

ContextSender (Producer) Receiver (Consumer)
Single MultipleSingleMultiple
BlockingBlockingTxTrait BlockingRxTrait
Tx MTx Rx MRx
Async AsyncTxTrait AsyncRxTrait
AsyncTx MAsyncTxAsyncRx MAsyncRx

Safety: For the SP / SC version, AsyncTx, AsyncRx, Tx, and Rx are not Clone and without Sync. Although can be moved to other threads, but not allowed to use send/recv while in an Arc. (Refer to the compile_fail examples in the type document).

The benefit of using the SP / SC API is completely lockless waker registration, in exchange for a performance boost.

The sender/receiver can use the From trait to convert between blocking and async context counterparts (refer to the example below)

§Error types

Error types are the same as crossbeam-channel:

TrySendError, SendError, SendTimeoutError, TryRecvError, RecvError, RecvTimeoutError

§Async compatibility

Tested on tokio-1.x and async-std-1.x, crossfire is runtime-agnostic.

The following scenarios are considered:

  • The AsyncTx::send() and AsyncRx::recv() operations are cancellation-safe in an async context. You can safely use the select! macro and timeout() function in tokio/futures in combination with recv(). On cancellation, SendFuture and RecvFuture will trigger drop(), which will clean up the state of the waker, making sure there is no memory-leak and deadlock. But you cannot know the true result from SendFuture, since it’s dropped upon cancellation. Thus, we suggest using AsyncTx::send_timeout() instead.

  • When the “tokio” or “async_std” feature is enabled, we also provide two additional functions:

  • The waker footprint:

When using a multi-producer and multi-consumer scenario, there’s a small memory overhead to pass along a Weak reference of wakers. Because we aim to be lockless, when the sending/receiving futures are canceled (like tokio::time::timeout()), it might trigger an immediate cleanup if the try-lock is successful, otherwise will rely on lazy cleanup. (This won’t be an issue because weak wakers will be consumed by actual message send and recv). On an idle-select scenario, like a notification for close, the waker will be reused as much as possible if poll() returns pending.

  • Handle written future:

The future object created by AsyncTx::send(), AsyncTx::send_timeout(), AsyncRx::recv(), AsyncRx::recv_timeout() is Sized. You don’t need to put them in Box.

If you like to use poll function directly for complex behavior, you can call AsyncSink::poll_send() or AsyncStream::poll_item() with Context.

§Usage

Cargo.toml:

[dependencies]
crossfire = "3.0"

§Feature flags

  • compat: Enable the compat model, which has the same API namespace struct as V2.x

  • tokio: Enable send_timeout, recv_timeout with tokio sleep function. (conflict with async_std feature)

  • async_std: Enable send_timeout, recv_timeout with async-std sleep function. (conflict with tokio feature)

  • trace_log: Development mode, to enable internal log while testing or benchmark, to debug deadlock issues.

§Example

blocking / async sender receiver mixed together


extern crate crossfire;
use crossfire::*;
#[macro_use]
extern crate tokio;
use tokio::time::{sleep, interval, Duration};

#[tokio::main]
async fn main() {
    let (tx, rx) = mpmc::bounded_async::<usize>(100);
    let mut recv_counter = 0;
    let mut co_tx = Vec::new();
    let mut co_rx = Vec::new();
    const ROUND: usize = 1000;

    let _tx: MTx<mpmc::Array<usize>> = tx.clone().into_blocking();
    co_tx.push(tokio::task::spawn_blocking(move || {
        for i in 0..ROUND {
            _tx.send(i).expect("send ok");
        }
    }));
    co_tx.push(tokio::spawn(async move {
        for i in 0..ROUND {
            tx.send(i).await.expect("send ok");
        }
    }));
    let _rx: MRx<mpmc::Array<usize>> = rx.clone().into_blocking();
    co_rx.push(tokio::task::spawn_blocking(move || {
        let mut count: usize = 0;
        'A: loop {
            match _rx.recv() {
                Ok(_i) => {
                    count += 1;
                }
                Err(_) => break 'A,
            }
        }
        count
    }));
    co_rx.push(tokio::spawn(async move {
        let mut count: usize = 0;
        'A: loop {
            match rx.recv().await {
                Ok(_i) => {
                    count += 1;
                }
                Err(_) => break 'A,
            }
        }
        count
    }));
    for th in co_tx {
        let _ = th.await.unwrap();
    }
    for th in co_rx {
        recv_counter += th.await.unwrap();
    }
    assert_eq!(recv_counter, ROUND * 2);
}

Modules§

compatcompat
compatible layer for V2.0 API
flavor
lockless queue implementation and channel flavor traits
mpmc
Multiple producers, multiple consumers.
mpsc
Multiple producers, single consumer.
null
A null flavor type that use to notify thread/future to close
oneshot
OneShot channel support both thread and async
select
Selection between channels
sink
spsc
Single producer, single consumer.
stream
waitgroup
A WaitGroup implementation allows custom threshold (>=0), works in blocking & async context.

Macros§

tokio_task_id
logging macro for development under tokio
trace_log
logging macro for development

Structs§

AsyncRx
A single consumer (receiver) that works in an async context.
AsyncTx
A single producer (sender) that works in an async context.
ChannelShared
MAsyncRx
A multi-consumer (receiver) that works in an async context.
MAsyncTx
A multi-producer (sender) that works in an async context.
MRx
A multi-consumer (receiver) that works in a blocking context.
MTx
A multi-producer (sender) that works in a blocking context.
ReadyTimeoutError
An error returned from the ready_timeout method.
RecvError
An error returned from the recv method.
RecvFuture
A fixed-sized future object constructed by AsyncRx::recv()
RecvTimeoutFuture
A fixed-sized future object constructed by AsyncRx::recv_timeout()
Rx
A single consumer (receiver) that works in a blocking context.
SelectTimeoutError
An error returned from the select_timeout method.
SendError
An error returned from the send method.
SendFuture
A fixed-sized future object constructed by AsyncTx::send()
SendTimeoutFuture
A fixed-sized future object constructed by AsyncTx::send_timeout()
TryReadyError
An error returned from the try_ready method.
Tx
A single producer (sender) that works in a blocking context.

Enums§

RecvTimeoutError
An error returned from the recv_timeout method.
SendTimeoutError
An error returned from the send_timeout method.
TryRecvError
An error returned from the try_recv method.
TrySendError
An error returned from the try_send method.

Traits§

AsyncRxTrait
For writing generic code with MAsyncRx & AsyncRx
AsyncTxTrait
For writing generic code with MAsyncTx & AsyncTx
BlockingRxTrait
For writing generic code with MRx & Rx
BlockingTxTrait
For writing generic code with MTx & Tx
NotCloneable
ReceiverType
type limiter for channel builder
SenderType
type limiter for channel builder

Functions§

detect_backoff_cfg
Detect cpu number and auto setting backoff config.