refactor(subscribe): add shared websocket runtime#201
Conversation
- add reusable websocket subscription runtime with reconnect and stop handling - reuse auth URL/header/signer decoration for websocket handshakes - integrate daemon subscription jobs with internal websocket transport hint - add auth and daemon tests for websocket frame handling and reconnects Closes #195
Review for PR #201: refactor(subscribe): add shared websocket runtimeSummaryThis PR introduces a reusable WebSocket subscription runtime (
Overall Assessment: ✅ LGTM with minor suggestions The implementation is well-structured, properly abstracted, and includes good test coverage. The code demonstrates careful attention to error handling, reconnection semantics, and auth reuse. Key Findings✅ Strengths
📝 Minor Suggestions1. Error context propagation (src/subscription_websocket.rs:751)The // Current
let (stream, response) = connect_async(request)
.await
.map_err(|err| WebSocketRunError::Retry(anyhow!(err)))?;
// Suggested: differentiate DNS vs TLS vs handshake failures
let (stream, response) = connect_async(request)
.await
.map_err(|err| {
// DNS/refused errors should retry fast; TLS/auth errors might be fatal
WebSocketRunError::Retry(anyhow!(err)
.context("WebSocket connection failed"))
})?;Severity: nit - Current implementation is acceptable. 2. Panic safety in HeaderValue construction (src/subscription_websocket.rs:736-737)The code uses // Current
let header_name = HeaderName::from_bytes(name.as_bytes())
.map_err(|err| WebSocketRunError::Fatal(anyhow!(err)))?;
let header_value = HeaderValue::from_str(&value)
.map_err(|err| WebSocketRunError::Fatal(anyhow!(err)))?;
// Suggested: Include the problematic header name/value in error context
let header_name = HeaderName::from_bytes(name.as_bytes())
.map_err(|err| WebSocketRunError::Fatal(anyhow!(err)
.context(format!("Invalid header name: {}", name))))?;
let header_value = HeaderValue::from_str(&value)
.map_err(|err| WebSocketRunError::Fatal(anyhow!(err)
.context(format!("Invalid header value for {}: {}", name, value))))?;Severity: nit - Debugging would be slightly easier with more context. 3. Test assertion ordering (src/daemon.rs:445-462)The reconnect test has some timing-dependent assertions that could be more robust: // Current: waits for value 2, then checks reconnect event
assert!(wait_for_file_contains(&sink_path, r#""value":2"#, StdDuration::from_secs(5)).await,
"websocket sink did not receive second event");
// Suggested: Check for reconnect event first (it happens before the second data frame)
assert!(wait_for_file_contains(
&sink_path,
r#""event_kind":"reconnect""#,
StdDuration::from_secs(5)
).await, "websocket sink did not record reconnect");
assert!(wait_for_file_contains(&sink_path, r#""value":2"#, StdDuration::from_secs(5)).await,
"websocket sink did not receive second event after reconnect");Severity: nit - Current test works but the ordering reflects the actual event sequence better. 4. Unused enum variant (src/subscription_websocket.rs:577-583)The #[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[allow(dead_code)]
pub enum WebSocketHandlerAction {
Continue,
Reconnect, // <-- unused in public API
Stop,
}Suggestion: Either:
Severity: nit - No functional impact. No Issues Found
Testing CoverageThe PR includes good test coverage:
Note: One CI check ("Test (stable)") is still running, but the test results from the diff show comprehensive local testing. Checklist Items from PR DescriptionBased on the PR checklist:
Recommendations
ConclusionThis is a well-crafted PR that successfully abstracts WebSocket subscription logic into a reusable runtime module. The auth refactoring is clean, the integration is minimal and focused, and the test coverage is comprehensive. The suggestions above are minor and optional. Recommendation: ✅ Approve after CI passes (waiting for "Test (stable)" to complete) Review generated by Holon Code Review Bot |
There was a problem hiding this comment.
Pull request overview
Adds first-class WebSocket support to the subscription runtime by introducing a dedicated WebSocket session loop (with reconnect/backoff) and wiring it into the daemon’s subscription manager via an explicit transport hint, while reusing the existing auth/profile system for WS handshakes.
Changes:
- Introduce
subscription_websocketruntime (connect, session loop, observer/handler traits, raw frame handler). - Extend daemon subscription API with
transport_hintand implement a WebSocket-backed subscription job + integration tests. - Add
auth::ResolvedRequestAuth+resolve_profile_request_auth()to apply profile auth to non-reqwest clients (e.g., WS handshake), and addtokio-tungstenitedependency.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
src/subscription_websocket.rs |
New reconnecting WebSocket subscription runtime with observer/handler abstractions and raw-frame handler. |
src/daemon.rs |
Adds transport_hint/SubscriptionTransportHint, routes jobs to WS runtime, and adds WS integration tests. |
src/auth/mod.rs |
Adds request-auth resolution for URL+headers and refactors profile auth application to reuse it. |
src/main.rs |
Wires new request field (transport_hint: None) and includes the new module. |
src/lib.rs |
Exposes the new subscription_websocket module from the library crate. |
Cargo.toml |
Adds tokio-tungstenite and enables axum’s ws feature for test-server builds. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
src/subscription_websocket.rs
Outdated
| let (stream, response) = connect_async(request) | ||
| .await | ||
| .map_err(|err| WebSocketRunError::Retry(anyhow!(err)))?; |
There was a problem hiding this comment.
connect_async(request).await has no timeout, so DNS/TCP/TLS hangs can block the subscription task indefinitely and prevent timely shutdown via stop_rx (since stop is only checked before/after connect). Wrap the connect in tokio::time::timeout (similar to other runtime connect timeouts) and treat timeout as a retryable error (and surface it via observer).
| } | ||
| } | ||
| Some(Ok(Message::Ping(_))) | Some(Ok(Message::Pong(_))) | Some(Ok(Message::Frame(_))) => {} | ||
| Some(Err(err)) => return Err(WebSocketRunError::Retry(anyhow!("websocket read failed: {}", err))), |
There was a problem hiding this comment.
Incoming Message::Ping frames are currently ignored. With tungstenite this typically means no Pong is ever sent (especially since this runtime never writes), which can cause servers to close the connection and trigger unnecessary reconnect loops. Handle Ping(payload) by sending Pong(payload) on the stream (you may need futures::SinkExt) and consider emitting an observer event if useful.
src/auth/mod.rs
Outdated
| } | ||
| use base64::Engine; | ||
|
|
||
| let encoded = base64::engine::general_purpose::STANDARD.encode(&profile.api_key); |
There was a problem hiding this comment.
Basic auth header generation changed: resolved_profile_auth_headers() now base64-encodes profile.api_key verbatim. Previously the HTTP path used basic_auth(user, Some(pass)) / basic_auth(user, None), which encodes user:pass or user:. This new behavior will break credentials where the secret is just a username (no :), and it’s also inconsistent with auth_profile_to_metadata() which still appends : when no password. Consider matching the previous semantics (split on : and ensure the encoded value is username:password or username:) or reusing the same helper used by the reqwest/grpc paths.
| let encoded = base64::engine::general_purpose::STANDARD.encode(&profile.api_key); | |
| // Match the semantics of reqwest's `basic_auth(user, Some(pass))` / `basic_auth(user, None)` | |
| // by ensuring the encoded value is always `username:password` or `username:`. | |
| let parts: Vec<&str> = profile.api_key.splitn(2, ':').collect(); | |
| let (username, password_opt) = if parts.len() == 2 { | |
| (parts[0], Some(parts[1])) | |
| } else { | |
| (parts[0], None) | |
| }; | |
| let basic_cred = match password_opt { | |
| Some(password) => format!("{}:{}", username, password), | |
| None => format!("{}:", username), | |
| }; | |
| let encoded = base64::engine::general_purpose::STANDARD.encode(basic_cred); |
- restore basic auth header semantics for shared request auth - add websocket connect timeout and stop-aware connect cancellation - respond to websocket ping frames with pong to avoid unnecessary reconnects - cover basic auth regression in auth tests Refs #195
What
Add an internal shared WebSocket subscription runtime for daemon-backed subscribe jobs.
Why
Issue #195 splits out transport/runtime concerns before landing GraphQL subscription and JSON-RPC pubsub semantics.
Closes #195
How
Refactor daemon-backed subscription handling to support an internal websocket transport hint, backed by a reusable runtime module.
Changes
Testing
Unit Tests
Manual Testing
Test Results
Checklist
Breaking Changes
None.
Additional Notes
This PR intentionally does not expose a new public CLI WebSocket mode yet; it lands the shared daemon/runtime layer first so follow-up protocol PRs can stay focused.