Skip to content

leiysky/stdexec-rs

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

9 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

stdexec-rs

A Rust port of C++ stdexec (P2300) — the Senders/Receivers model for structured concurrency.

This project was written entirely by Claude (Anthropic).

Overview

stdexec-rs implements the core abstractions from the C++ Senders/Receivers proposal:

  • Sender — A lazy, composable description of asynchronous work.
  • Receiver — Handles completion signals: value, error, or stopped.
  • OperationState — The result of connecting a sender to a receiver; calling start() initiates execution.
  • Scheduler — A handle to an execution context that can schedule work.

Senders are not executed until they are connected to a receiver and the resulting operation state is started. This makes them fundamentally different from futures: they are lazy, composable, and cancellation-aware by design.

Quick Start

use stdexec_rs::{just, sync_wait, SenderExt};

fn main() {
    let work = just(42)
        .then(|x| x * 2)
        .then(|x| format!("result = {x}"));

    let result = sync_wait(work).unwrap();
    assert_eq!(result, "result = 84");
}

With tokio:

use stdexec_rs::{just, sync_wait, transfer_just, when_all, SenderExt, TokioScheduler};

fn main() {
    let rt = tokio::runtime::Builder::new_multi_thread()
        .enable_all()
        .build()
        .unwrap();
    let sched = TokioScheduler::new(rt.handle().clone());

    // Parallel work on tokio threads
    let work = when_all(
        transfer_just(sched.clone(), 1).then(|x| x + 1),
        transfer_just(sched, 10).then(|x| x * 2),
    )
    .then(|(a, b)| a + b);

    let result = sync_wait(work).unwrap();
    assert_eq!(result, 22);
}

Sender Algorithms

Algorithm Description
just(v) Produce a value immediately
.then(f) Transform the value
.let_value(f) Chain a dynamically created sender
.let_error(f) Handle errors by returning a sender
.let_stopped(f) Handle stopped by returning a sender
.transfer(sch) Continue on another execution context
on(sch, s) Start a sender on a given context
when_all(s1, s2) Wait for both senders (with cancellation)
when_all!(a, b, c, ...) Variadic when_all producing flat tuples
.stopped_as_optional() Convert stopped to None
.stopped_as_error(e) Convert stopped to an error
.bulk(range, f) Bulk sequential execution
split(s) Share a sender result among multiple consumers
ensure_started(s) Eagerly start, deliver result later
.into_boxed() Type-erase a sender
transfer_just(sch, v) Convenience: just(v).transfer(sch)
schedule_from(sch, s) Run sender then transfer to scheduler

Async Combinators

These require an AsyncScheduler (e.g. TokioScheduler) to spawn futures:

Algorithm Description
from_future(sch, fut) Create a sender from a Future
from_result_future(sch, fut) Create a sender from a fallible Future
.async_then(sch, f) Async value transformation
.let_async_value(sch, f) Async dynamic sender creation
.let_async_error(sch, f) Async error recovery

Dynamic Graph

Build type-erased computation DAGs at runtime. Values are Arc<dyn Any + Send + Sync>, enabling heterogeneous node types and automatic fan-out via split.

API Description
DynGraph Runtime-assembled computation DAG
join_all(senders) N-way fan-in for type-erased senders
dyn_just(v) Source sender from a typed value
dyn_node(f) Create a node function from a closure
dyn_value(v) Wrap a typed value into DynValue
downcast::<T>(v) Recover a typed value from DynValue
use stdexec_rs::{DynGraph, dyn_just, dyn_node, dyn_value, downcast, sync_wait};

let mut g = DynGraph::new();
let a = g.add_source(dyn_just(10i32));
let b = g.add_node(&[a], dyn_node(|i| {
    let x: i32 = downcast(i[0].clone());
    dyn_value(x * 2) // 20
}));
let c = g.add_node(&[a], dyn_node(|i| {
    let x: i32 = downcast(i[0].clone());
    dyn_value(x + 5) // 15
}));
let d = g.add_node(&[b, c], dyn_node(|i| {
    let x: i32 = downcast(i[0].clone());
    let y: i32 = downcast(i[1].clone());
    dyn_value(x + y) // 35
}));

let result = sync_wait(g.build(d)).unwrap();
assert_eq!(downcast::<i32>(result), 35);

Consumers

Consumer Description
sync_wait(s) Block until the sender completes
start_detached(s) Fire and forget
into_future(s) Convert a sender into a Future

Schedulers

Scheduler Description
InlineScheduler Runs work on the current thread
TokioScheduler Dispatches work onto a tokio runtime

TokioScheduler also implements TimedScheduler (for schedule_after) and AsyncScheduler (for spawning futures).

Cancellation

StopSource / StopToken provide cooperative cancellation. when_all automatically creates a stop source so that an error in one branch can signal cancellation to the other.

Error Handling

Errors propagate through sender chains as Box<dyn Error + Send>. User closures in then, let_value, bulk, etc. are wrapped with catch_unwind, so panics are converted to errors rather than crashing worker threads.

use stdexec_rs::{just, sync_wait, transfer_just, SenderExt, TokioScheduler};

let work = transfer_just(sched, ())
    .then(|_| -> i32 {
        panic!("something went wrong");
    })
    .let_error(|err| {
        println!("caught: {err}");
        just(0) // recover with default
    });

let result = sync_wait(work).unwrap();
assert_eq!(result, 0);

Core Traits

pub trait Receiver: Send + 'static {
    type Value: Send + 'static;
    fn set_value(self, value: Self::Value);
    fn set_error(self, error: Box<dyn Error + Send>);
    fn set_stopped(self);
}

pub trait OperationState {
    fn start(&mut self);
}

pub trait Sender: Send + Sized + 'static {
    type Value: Send + 'static;
    type Operation<R: Receiver<Value = Self::Value>>: OperationState;
    fn connect<R: Receiver<Value = Self::Value>>(self, receiver: R) -> Self::Operation<R>;
}

pub trait Scheduler: Clone + Send + 'static {
    type Sender: Sender<Value = ()>;
    fn schedule(&self) -> Self::Sender;
}

Examples

cargo run --example basic              # Sync sender composition
cargo run --example tokio_runtime      # All features with TokioScheduler
cargo run --example pipeline           # ETL-style data processing
cargo run --example parallel_compute   # Monte Carlo Pi, matrix multiply, word count
cargo run --example error_recovery     # Fallback chains, retry patterns
cargo run --example timeout            # Delayed execution, periodic sampling
cargo run --example async_integration  # Async combinators and mixed pipelines
cargo run --example dynamic_graph     # Type-erased runtime computation DAGs

License

This project is licensed under the MIT License.

About

Rust port of C++ stdexec

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages