6 releases
Uses new Rust 2024
| new 0.2.1 | Feb 1, 2026 |
|---|---|
| 0.2.0 | Jan 24, 2026 |
| 0.1.2 | Jan 24, 2026 |
| 0.0.1 | Jan 19, 2026 |
#218 in Game dev
Used in rx_bevy
475KB
9K
SLoC
rx_core
A runtime agnostic implementation of Reactive Extensions for Rust!
[!IMPORTANT] Currently this crate does not provide an async executor! It was primarily developed to be used in the Bevy game engine, through rx_bevy. However, I do want to add additional executors in the future.
Documentation
- To learn more about this crate, visit https://alexaegis.github.io/rx_bevy/
- To learn more about Rx in general, visit the ReactiveX Website!
What makes it different?
- Runtime agnostic implementation.
- Heavy use of GATs to avoid dynamic dispatch and function calls wherever possible, enabling inlining and optimizations by the compiler.
- Deadlock free execution.
You could even create a subject that subscribes to itself and sends events on every single value it observes, creating a fractal of subscriptions even on a single thread. But please don't.
Contents
rx_core is an extensible framework, the rx_core_common crate provides
common types and traits used by all other crates.
It defines what an Observable, Observer, Subscription, Subject, Operator, Subscriber, and a Scheduler is. How Operators (and ComposableOperators) are piped together. And how Subscriptions and Subscribers avoid deadlocking situations in single-threaded situations by deferring notifications.
Observables
Observables define a stream of emissions that is instantiated upon subscription.
- Creation:
- CreateObservable - Define your own function that will interact with the subscriber!
- DeferredObservable - Subscribes to an observable returned by a function.
- Immediate Observables:
- JustObservable - Immediately emits a single value!
- EmptyObservable - Immediately completes!
- ThrowObservable - Immediately errors!
- ClosedObservable - Immediately unsubscribes!
- Miscellaneous Observables:
- NeverObservable -
Never emits, never unsubscribes! Only once dropped!
Warning: you need to handle subscriptions made to this yourself!
- NeverObservable -
Never emits, never unsubscribes! Only once dropped!
- Combination (Multi-Signal):
- CombineChangesObservable - Subscribes to two different observables, and emit the latest of both both values when either of them emits. It denotes which one had changed, and it emits even when one on them haven't emitted yet.
- CombineLatestObservable - Subscribes to two observables, and emits the latest of both values when either of them emits. It only starts emitting once both have emitted at least once.
- ZipObservable - Subscribes to two different observables, and emit both values when both of them emits, pairing up emissions by the order they happened.
- JoinObservable - Subscribes to two different observables, and emit the latest of both values once both of them had completed!
- Combination (Single-Signal):
- MergeObservable - Combine many observables of the same output type into a single observable, subscribing to all of them at once!
- ConcatObservable - Combine many observables of the same output type into a single observable, subscribing to them one-by-one in order!
- Timing:
- TimerObservable -
Emit a
()once the timer elapses! - IntervalObservable -
Emit a sequence of
usize's every time theDurationof the interval rolls over.
- TimerObservable -
Emit a
- Iterators:
- IteratorObservable - Emits the values of an iterator immediately when subscribed to.
- IteratorOnTickObservable - Emits the values of an iterator once per every tick of the scheduler.
- Connectable
- ConnectableObservable -
Maintains an internal connector subject, that can subscribe to a source
observable only when the
connectfunction is called on it. Subscribers of will subscribe to this internal connector.
- ConnectableObservable -
Maintains an internal connector subject, that can subscribe to a source
observable only when the
Observers
Observers are the destinations of subscriptions! They are the last stations of a signal.
- PrintObserver -
A simple observer that prints all signals to the console using
println!. - FnObserver - A custom observer that uses user-supplied functions to handle signals. All signal handlers must be defined up-front.
- DynFnObserver - A custom observer that uses user-supplied functions to handle signals. not all signal handlers have to be defined, but will panic if it observes an error without an error handler defined.
- NoopObserver - Ignores all signals. Will panic in debug mode if it observes an error.
Subjects
Subjects are both Observers and Observables at the same time. Subjects multicast the signals they observe across all subscribers.
- PublishSubject - Observed signals are forwarded to all active subscribers. It does not replay values to late subscribers, but terminal state (complete/error) is always replayed! Other subjects are built on top of this.
- BehaviorSubject - Always holds a value that is replayed to late subscribers.
- ReplaySubject -
Buffers the last
Nvalues and replays them to late subscribers. - AsyncSubject - Reduces observed values into one and emits it to active subscribers once completed. Once completed, it also replays the result to late subscribers.
- ProvenanceSubject -
A
BehaviorSubjectthat also stores an additional value that can be used for filtering. Useful to track the origin of a value as some subscribers may only be interested in certain origins while some are interested in all values regardless of origin.
Operators
Operators take an observable as input and return a new observable as output, enhancing the original observable with new behavior.
- Mapping:
- MapOperator - Transform each value with a mapping function.
- MapIntoOperator -
Map each value using
Into. - MapErrorOperator - Transform error values into another error value.
- MapNeverOperator -
Re-type
Nevernext/error channels into concrete types as they are always!unreachable(). - MaterializeOperator - Turn next/error/complete into notification values. Rendering terminal signals ineffective.
- DematerializeOperator - Convert notifications back into real signals.
- EnumerateOperator - Attach a running index to each emission.
- PairwiseOperator - Emit the previous and current values together.
- Filtering Operators (Multi-Signal):
- FilterOperator - Keep values that satisfy a predicate.
- FilterMapOperator -
Map values to an
Optionand keep only theSomevalues. - TakeOperator -
Emit only the first
nvalues, then complete. - SkipOperator -
Drop the first
nvalues. - LiftOptionOperator -
Filter out
Noneand forwardSomevalues.
- Filtering Operators (Single-Signal):
- FirstOperator - Emit the very first value, then complete.
- FindOperator - Emit the first value matching a predicate, then complete.
- FindIndexOperator - Emit the index of the first matching value, then complete.
- ElementAtOperator - Emit the value at the given index then complete.
- IsEmptyOperator - Emit a single boolean indicating if the source emitted anything before it had completed.
- Higher-Order (Flatten Observable Observables):
- ConcatAllOperator - Subscribes to all upstream observables one at a time in order.
- MergeAllOperator - Subscribes to all upstream observables and merges their emissions concurrently.
- SwitchAllOperator - Subscribe to the upstream observable, unsubscribing previous ones.
- ExhaustAllOperator - Subscribe to the upstream observables only if there is no active subscription.
- Higher-Order (Mapper)
- ConcatMapOperator - Maps upstream signals into an observable, then subscribes to them one at a time in order.
- MergeMapOperator - Maps upstream signals into an observable, then subscribes to them and merges their emissions concurrently.
- SwitchMapOperator - Maps upstream signals into an observable, then subscribes to the latest one, unsubscribing previous ones.
- ExhaustMapOperator - Maps upstream signals into an observable, then subscribes to them only if there is no active subscription.
- Combination:
- WithLatestFromOperator - Combine each source emission with the latest value from another observable.
- Buffering:
- BufferCountOperator - Collect values into fixed-size buffers before emitting them.
- Multicasting:
- ShareOperator - Multicast a source through a connector so downstream subscribers share one upstream subscription. The connector can be any subject.
- Accumulator (Multi-Signal):
- ScanOperator - Accumulate state and emit every intermediate result.
- Accumulator (Single-Signal):
- CountOperator - Count values emitted by upstream.
- ReduceOperator - Fold values and emit only the final accumulator on completion.
- Side-Effects:
- TapOperator - Mirror values into another observer while letting them pass through.
- TapNextOperator -
Run a callback for each
nextwithout touching errors or completion. - OnNextOperator - Invoke a callback for each value that can also decide whether to forward it.
- OnSubscribeOperator - Run a callback when a subscription is established.
- FinalizeOperator - Execute cleanup when the observable finishes or unsubscribes.
- Producing:
- StartWithOperator - Emit a value first when subscribing to the source.
- EndWithOperator - Emit a value on completion.
- Error Handling:
- CatchOperator - On error, switch to a recovery observable.
- RetryOperator - Resubscribe on error up to the configured retry count.
- IntoResultOperator -
Capture next/error signals as
Resultvalues. - LiftResultOperator -
Split
Resultvalues into next and error signals. - ErrorBoundaryOperator -
Enforce
Neveras the error type to guard pipelines at compile time.
- Timing Operators:
- AdsrOperator - Convert trigger signals into an ADSR envelope driven by the scheduler.
- DebounceTimeOperator - Emit the most recent value after a period of silence.
- DelayOperator - Shift emissions forward in time using the scheduler.
- FallbackWhenSilentOperator - Emit a fallback value on ticks where the source stayed silent.
- ObserveOnOperator - Re-emit upstream signals with the provided scheduler.
- SubscribeOnOperator - Schedule upstream subscription on the provided scheduler.
- ThrottleTimeOperator - Limit the frequency of downstream emissions.
- Composite Operators:
- CompositeOperator - Build reusable operator chains without needing a source observable!
- IdentityOperator -
A no-op operator, used mainly as the entry point of a
CompositeOperator.
Macros
For every primitive, there is a derive macro available to ease implementation.
They mostly implement traits defining associated types like Out and
OutError. They may also provide default, trivial implementations for when it
is applicable.
See the individual macros for more information:
RxExecutor- Derive macro for Executors.RxObservable- Derive macro for Observables.RxObserver- Derive macro for RxObservers.RxOperator- Derive macro for Operators.RxScheduler- Derive macro for Schedulers.RxSubject- Derive macro for Subjects.RxSubscriber- Derive macro for Subscribers.RxSubscription- Derive macro for Subscriptions.
Testing
The rx_core_testing crate provides utilities to test your Observables and
Operators.
- MockExecutor & Scheduler - Control the passage of time manually.
- MockObserver & NotificationCollector - Collect all observed notifications and perform assertions over them.
- TestHarness - Perform more complex assertions to ensure proper behavior.
For Maintainers
See contributing.md
Dependencies
~0.3–6MB
~120K SLoC
