6 releases

Uses new Rust 2024

new 0.2.1 Feb 1, 2026
0.2.0 Jan 24, 2026
0.1.2 Jan 24, 2026
0.0.1 Jan 19, 2026

#2806 in Game dev


Used in 4 crates (3 directly)

MIT license

215KB
5.5K SLoC

observable_connectable

crates.io ci codecov license

Maintains an internal connector subject and only subscribes the source when connect is called, letting multiple subscribers share that connection.

See Also

  • ShareOperator - Multicast a source through a connector so downstream subscribers share one upstream subscription. The connector can be any subject.

Example

cargo run -p rx_core --example observable_connectable_example
let mut source = PublishSubject::<usize>::default();
let mut connectable = ConnectableObservable::new(
  source.clone().finalize(|| println!("disconnected...")),
  ConnectableOptions {
    connector_provider: ProvideWithDefault::<ReplaySubject<1, _>>::default(),
    disconnect_when_ref_count_zero: true,
    reset_connector_on_disconnect: false,
    reset_connector_on_complete: false,
    reset_connector_on_error: false,
  },
);
let mut _subscription_0 = connectable.subscribe(PrintObserver::new("connectable_observable 0"));
source.next(0);

println!("connect!");
let _connection = connectable.connect();
source.next(1);
connectable.disconnect();

let mut _subscription_1 = connectable.subscribe(PrintObserver::new("connectable_observable 1"));
println!("connect again!");
connectable.connect();
source.next(2);

println!("end");

Output:

connect!
connectable_observable 0 - next: 1
disconnected...
connectable_observable 1 - next: 1
connect again!
connectable_observable 1 - next: 2
connectable_observable 0 - next: 2
end
connectable_observable 1 - unsubscribed
disconnected...
connectable_observable 0 - unsubscribed

Dependencies

~275–750KB
~18K SLoC