#redis #rrq #runner #redis-queue #jobs

rrq-runner

RRQ runner runtime for Rust

16 releases

Uses new Rust 2024

new 0.10.3 Feb 17, 2026
0.10.2 Feb 15, 2026
0.9.20 Feb 13, 2026
0.9.9 Jan 31, 2026

#265 in Asynchronous

Download history 7/week @ 2026-01-27 51/week @ 2026-02-03 86/week @ 2026-02-10

146 downloads per month

Apache-2.0

160KB
3.5K SLoC

rrq-runner

Crates.io Documentation License

Rust runtime for building RRQ job handlers. Write your job handlers in Rust and let this crate handle the socket protocol, connection management, and job dispatching.

What is RRQ?

RRQ (Reliable Redis Queue) is a distributed job queue with a Rust orchestrator and language-flexible workers. This crate lets you build high-performance job handlers in Rust that connect to the RRQ orchestrator.

Why Rust runners?

  • Maximum performance - Native code for CPU-intensive tasks
  • Memory safety - No GC pauses, predictable latency
  • Async native - Built on Tokio for efficient concurrency
  • Same ecosystem - Use Rust crates in your job handlers

Installation

[dependencies]
rrq-runner = "0.9"

With OpenTelemetry:

[dependencies]
rrq-runner = { version = "0.9", features = ["otel"] }

Quick Start

use rrq_runner::{Registry, run_tcp, parse_tcp_socket, ExecutionOutcome};
use serde_json::json;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut registry = Registry::new();

    registry.register("greet", |request| async move {
        let name = request.params.get("name")
            .and_then(|v| v.as_str())
            .unwrap_or("World");

        ExecutionOutcome::success(
            request.job_id.clone(),
            request.request_id.clone(),
            json!({ "message": format!("Hello, {}!", name) }),
        )
    });

    let args: Vec<String> = std::env::args().skip(1).collect();
    let tcp_socket = args
        .iter()
        .position(|arg| arg == "--tcp-socket")
        .and_then(|idx| args.get(idx + 1))
        .ok_or("Missing --tcp-socket")?;
    let addr = tcp_socket.as_str();
    run_tcp(&registry, parse_tcp_socket(&addr)?)
}

Handler Functions

Handlers receive an ExecutionRequest and return an ExecutionOutcome:

registry.register("process_order", |request| async move {
    // Access job metadata
    println!("Job: {}", request.job_id);
    println!("Attempt: {}", request.context.attempt);

    // Access parameters
    let order_id = request.params.get("order_id")
        .and_then(|v| v.as_str())
        .ok_or("missing order_id")?;

    // Do work...

    ExecutionOutcome::success(
        request.job_id.clone(),
        request.request_id.clone(),
        json!({ "processed": order_id }),
    )
});

Outcome Types

// Success
ExecutionOutcome::success(job_id, request_id, json!({"result": "ok"}))

// Failure (may be retried)
ExecutionOutcome::failure(job_id, request_id, "Something went wrong".to_string())

// Retry after delay
ExecutionOutcome::retry_after(job_id, request_id, "Rate limited".to_string(), 60)

// Timeout
ExecutionOutcome::timeout(job_id, request_id)

// Cancelled
ExecutionOutcome::cancelled(job_id, request_id)

OpenTelemetry

use rrq_runner::{RunnerRuntime, Registry, parse_tcp_socket};
use rrq_runner::telemetry::otel::{init_tracing, OtelTelemetry};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let runtime = RunnerRuntime::new()?;

    // Initialize tracing
    { let _guard = runtime.enter(); init_tracing("my-runner")?; }

    let mut registry = Registry::new();
    registry.register("traced_job", |req| async move {
        ExecutionOutcome::success(req.job_id.clone(), req.request_id.clone(), json!({}))
    });

    let args: Vec<String> = std::env::args().skip(1).collect();
    let tcp_socket = args
        .iter()
        .position(|arg| arg == "--tcp-socket")
        .and_then(|idx| args.get(idx + 1))
        .ok_or("Missing --tcp-socket")?;
    runtime.run_tcp_with(&registry, parse_tcp_socket(tcp_socket)?, &OtelTelemetry)
}

Configure via:

  • OTEL_EXPORTER_OTLP_TRACES_ENDPOINT - Traces endpoint (highest precedence)
  • OTEL_EXPORTER_OTLP_METRICS_ENDPOINT - Metrics endpoint (highest precedence)
  • OTEL_EXPORTER_OTLP_LOGS_ENDPOINT - Logs endpoint (highest precedence)
  • OTEL_EXPORTER_OTLP_ENDPOINT - Fallback base OTLP/HTTP endpoint (for example http://127.0.0.1:4318)
  • OTEL_EXPORTER_OTLP_HEADERS and OTEL_EXPORTER_OTLP_{TRACES|METRICS|LOGS}_HEADERS - Comma-separated key=value headers
  • OTEL_SERVICE_NAME - Service name

Endpoint resolution rules:

  • Signal-specific endpoint vars take precedence over OTEL_EXPORTER_OTLP_ENDPOINT
  • If a signal-specific endpoint is unset, RRQ falls back to OTEL_EXPORTER_OTLP_ENDPOINT and appends /v1/{traces|metrics|logs}
  • If a signal-specific endpoint is explicitly set to an empty value, that signal is disabled (no fallback)

Configuration

Add your runner to rrq.toml:

[rrq.runners.rust]
type = "socket"
cmd = ["./target/release/my-runner"]
tcp_socket = "127.0.0.1:9000"
pool_size = 4
max_in_flight = 10
Crate Purpose
rrq Orchestrator
rrq-producer Enqueue jobs
rrq-protocol Wire protocol

License

Apache-2.0

Dependencies

~10–28MB
~305K SLoC