#async-concurrency #p2300

stdexec-rs

A Rust port of C++ stdexec (P2300) — Senders/Receivers for structured concurrency

1 unstable release

Uses new Rust 2024

0.1.0 Feb 8, 2026

#606 in Asynchronous

MIT license

77KB
2K SLoC

stdexec-rs

A Rust port of C++ stdexec (P2300) — the Senders/Receivers model for structured concurrency.

This project was written entirely by Claude (Anthropic).

Overview

stdexec-rs implements the core abstractions from the C++ Senders/Receivers proposal:

  • Sender — A lazy, composable description of asynchronous work.
  • Receiver — Handles completion signals: value, error, or stopped.
  • OperationState — The result of connecting a sender to a receiver; calling start() initiates execution.
  • Scheduler — A handle to an execution context that can schedule work.

Senders are not executed until they are connected to a receiver and the resulting operation state is started. This makes them fundamentally different from futures: they are lazy, composable, and cancellation-aware by design.

Quick Start

use stdexec_rs::{just, sync_wait, SenderExt};

fn main() {
    let work = just(42)
        .then(|x| x * 2)
        .then(|x| format!("result = {x}"));

    let result = sync_wait(work).unwrap();
    assert_eq!(result, "result = 84");
}

With tokio:

use stdexec_rs::{just, sync_wait, transfer_just, when_all, SenderExt, TokioScheduler};

fn main() {
    let rt = tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap();
    let sched = TokioScheduler::new(rt.handle().clone());

    // Parallel work on tokio threads
    let work = when_all(
        transfer_just(sched.clone(), 1).then(|x| x + 1),
        transfer_just(sched, 10).then(|x| x * 2),
    )
    .then(|(a, b)| a + b);

    let result = sync_wait(work).unwrap();
    assert_eq!(result, 22);
}

Sender Algorithms

Algorithm Description
just(v) Produce a value immediately
.then(f) Transform the value
.let_value(f) Chain a dynamically created sender
.let_error(f) Handle errors by returning a sender
.let_stopped(f) Handle stopped by returning a sender
.transfer(sch) Continue on another execution context
on(sch, s) Start a sender on a given context
when_all(s1, s2) Wait for both senders (with cancellation)
when_all!(a, b, c, ...) Variadic when_all producing flat tuples
.stopped_as_optional() Convert stopped to None
.stopped_as_error(e) Convert stopped to an error
.bulk(range, f) Bulk sequential execution
split(s) Share a sender result among multiple consumers
ensure_started(s) Eagerly start, deliver result later
.into_boxed() Type-erase a sender
transfer_just(sch, v) Convenience: just(v).transfer(sch)
schedule_from(sch, s) Run sender then transfer to scheduler

Async Combinators

These require an AsyncScheduler (e.g. TokioScheduler) to spawn futures:

Algorithm Description
from_future(sch, fut) Create a sender from a Future
from_result_future(sch, fut) Create a sender from a fallible Future
.async_then(sch, f) Async value transformation
.let_async_value(sch, f) Async dynamic sender creation
.let_async_error(sch, f) Async error recovery

Consumers

Consumer Description
sync_wait(s) Block until the sender completes
start_detached(s) Fire and forget
into_future(s) Convert a sender into a Future

Schedulers

Scheduler Description
InlineScheduler Runs work on the current thread
TokioScheduler Dispatches work onto a tokio runtime

TokioScheduler also implements TimedScheduler (for schedule_after) and AsyncScheduler (for spawning futures).

Cancellation

StopSource / StopToken provide cooperative cancellation. when_all automatically creates a stop source so that an error in one branch can signal cancellation to the other.

Error Handling

Errors propagate through sender chains as Box<dyn Error + Send>. User closures in then, let_value, bulk, etc. are wrapped with catch_unwind, so panics are converted to errors rather than crashing worker threads.

use stdexec_rs::{just, sync_wait, transfer_just, SenderExt, TokioScheduler};

let work = transfer_just(sched, ())
    .then(|_| -> i32 {
        panic!("something went wrong");
    })
    .let_error(|err| {
        println!("caught: {err}");
        just(0) // recover with default
    });

let result = sync_wait(work).unwrap();
assert_eq!(result, 0);

Core Traits

pub trait Receiver: Send + 'static {
    type Value: Send + 'static;
    fn set_value(self, value: Self::Value);
    fn set_error(self, error: Box<dyn Error + Send>);
    fn set_stopped(self);
}

pub trait OperationState {
    fn start(&mut self);
}

pub trait Sender: Send + Sized + 'static {
    type Value: Send + 'static;
    type Operation<R: Receiver<Value = Self::Value>>: OperationState;
    fn connect<R: Receiver<Value = Self::Value>>(self, receiver: R) -> Self::Operation<R>;
}

pub trait Scheduler: Clone + Send + 'static {
    type Sender: Sender<Value = ()>;
    fn schedule(&self) -> Self::Sender;
}

Examples

cargo run --example basic              # Sync sender composition
cargo run --example tokio_runtime      # All features with TokioScheduler
cargo run --example pipeline           # ETL-style data processing
cargo run --example parallel_compute   # Monte Carlo Pi, matrix multiply, word count
cargo run --example error_recovery     # Fallback chains, retry patterns
cargo run --example timeout            # Delayed execution, periodic sampling
cargo run --example async_integration  # Async combinators and mixed pipelines

License

This project is licensed under the MIT License.

Dependencies

~1.9–3MB
~47K SLoC