#job-queue #worker #async

flashq

High-performance Rust client for flashQ job queue

1 unstable release

0.4.0 Feb 16, 2026

#455 in Asynchronous

MIT license

94KB
2K SLoC

flashQ Rust SDK

High-performance Rust client for flashQ job queue server.

Features

  • Async/await - Built on Tokio for native async support
  • Connection pooling - Round-robin pool with auto-reconnect
  • Dual protocol - JSON (text) and MessagePack (binary, 40% smaller, 3-5x faster)
  • BullMQ-compatible - Queue and Worker classes with familiar API
  • Full feature set - 60+ methods covering all flashQ operations

Installation

cargo add flashq

Or add to Cargo.toml:

[dependencies]
flashq = "0.4"
tokio = { version = "1", features = ["full"] }

Quick Start

Start the Server

docker run -p 6789:6789 flashq

Push and Process Jobs

use flashq::{FlashQ, Worker, WorkerOptions};
use flashq::types::WorkerEvent;

#[tokio::main]
async fn main() -> flashq::Result<()> {
    // Push a job
    let client = FlashQ::new();
    client.connect().await?;

    let job_id = client.push("emails", serde_json::json!({
        "to": "user@example.com",
        "subject": "Welcome!",
    }), None).await?;
    println!("Pushed job: {job_id}");
    client.close().await?;

    // Process jobs with a worker
    let worker = Worker::new(
        vec!["emails".to_string()],
        |job| async move {
            println!("Sending email to: {}", job.data["to"]);
            Ok(serde_json::json!({"sent": true}))
        },
        Some(WorkerOptions {
            concurrency: 5,
            ..Default::default()
        }),
    );

    worker.start().await?;
    Ok(())
}

API Reference

Queue (BullMQ-compatible)

use flashq::Queue;

let queue = Queue::new("emails");
queue.connect().await?;

// Add jobs
let id = queue.add("send-welcome", data, None).await?;
let ids = queue.add_bulk(jobs).await?;

// Query
let job = queue.get_job(id).await?;
let counts = queue.get_job_counts().await?;
let total = queue.count().await?;

// Control
queue.pause().await?;
queue.resume().await?;
queue.drain().await?;
queue.obliterate().await?;
queue.close().await?;

Worker

use flashq::{Worker, WorkerOptions, WorkerEventData};
use flashq::types::WorkerEvent;

let worker = Worker::new(
    vec!["tasks".to_string()],
    |job| async move {
        println!("Processing: {:?}", job.data);
        Ok(serde_json::json!({"done": true}))
    },
    Some(WorkerOptions {
        concurrency: 10,
        ..Default::default()
    }),
);

worker.on(WorkerEvent::Completed, |event| {
    if let WorkerEventData::Completed { job_id, .. } = event {
        println!("Job {job_id} done!");
    }
});

worker.start().await?;

Low-Level Client

use flashq::{FlashQ, ClientOptions, PushOptions};
use std::time::Duration;

let client = FlashQ::with_options(ClientOptions {
    host: "localhost".into(),
    port: 6789,
    use_binary: true, // MessagePack protocol
    pool_size: 4,
    ..Default::default()
});
client.connect().await?;

// Core operations
let id = client.push("queue", data, Some(PushOptions {
    priority: Some(10),
    delay: Some(5000),
    max_attempts: Some(3),
    ..Default::default()
})).await?;

let job = client.pull("queue", Some(Duration::from_secs(30))).await?;
client.ack(job_id, Some(result)).await?;
client.fail(job_id, Some("error message")).await?;

// Job queries
let job = client.get_job(id).await?;
let state = client.get_state(id).await?;
let result = client.finished(id, Some(Duration::from_secs(30))).await?;

// Queue management
client.pause("queue").await?;
client.resume("queue").await?;
client.set_rate_limit("queue", 100).await?;
client.set_concurrency("queue", 5).await?;

// Cron jobs
client.add_cron("daily-cleanup", CronOptions {
    queue: "maintenance".into(),
    data: serde_json::json!({"action": "cleanup"}),
    schedule: Some("0 0 * * * *".into()),
    ..Default::default()
}).await?;

// Flows (parent-child dependencies)
let flow = client.push_flow("parent-queue", parent_data, vec![
    FlowChild { queue: "child-q".into(), data: child_data, .. },
], None).await?;

client.close().await?;

Error Handling

use flashq::{FlashQ, FlashQError};

match client.push("queue", data, None).await {
    Ok(id) => println!("Job {id}"),
    Err(FlashQError::Connection(msg)) => eprintln!("Connection lost: {msg}"),
    Err(FlashQError::Timeout(msg)) => eprintln!("Timed out: {msg}"),
    Err(FlashQError::DuplicateJob(msg)) => eprintln!("Duplicate: {msg}"),
    Err(FlashQError::RateLimit(msg)) => eprintln!("Rate limited: {msg}"),
    Err(FlashQError::Validation(msg)) => eprintln!("Invalid: {msg}"),
    Err(e) => eprintln!("Error: {e}"),
}

// Check if retryable
if e.is_retryable() {
    // Safe to retry
}

Push Options

Option Type Description
priority i32 Higher = processed first
delay u64 Delay in ms before becoming ready
ttl u64 Time-to-live in ms
timeout u64 Processing timeout in ms
max_attempts u32 Max retries before DLQ
backoff u64 Exponential backoff base in ms
unique_key String Deduplication key
depends_on Vec<u64> Job IDs to wait for
tags Vec<String> Job tags for filtering
lifo bool Last-in-first-out mode
job_id String Custom job ID (idempotency)
stall_timeout u64 Stall detection timeout in ms
group_id String FIFO processing within group
keep_completed_age u64 Retention: keep for N ms
keep_completed_count u64 Retention: keep last N

Configuration

ClientOptions

Option Default Description
host "localhost" Server host
port 6789 TCP port
token None Auth token
timeout 5s Request timeout
use_binary false Use MessagePack protocol
auto_reconnect true Auto-reconnect on disconnect
pool_size 4 Connection pool size
reconnect_delay 1s Initial reconnect delay
max_reconnect_delay 30s Max reconnect delay
max_reconnect_attempts 10 Max reconnect attempts

WorkerOptions

Option Default Description
concurrency 1 Parallel job processing
batch_size 100 Jobs per pull batch
auto_start true Start on creation
close_timeout 30s Graceful shutdown timeout
stall_timeout 30s Stall detection timeout

Examples

# Example Description
01 basic Push, Pull, Ack
02 worker Worker processing
03 priority Priority ordering
04 delayed Scheduled jobs
05 batch Batch operations
06 retry Retry & DLQ
07 progress Progress tracking
08 cron Cron scheduling
09 rate_limit Rate limiting
10 queue_api BullMQ Queue API
11 unique Job deduplication
12 finished Wait for completion
13 job_options All push options
14 events Worker events
15 queue_control Pause/Resume/Drain
16 concurrency Concurrency limits
17 benchmark Performance benchmark
18 flow Job dependencies
19 ai_workflow AI/ML pipeline
20 batch_inference Batch ML inference
21 rag_pipeline RAG workflow
22 groups Job groups (FIFO)
23 streaming Partial results

Run an example:

cargo run --example 01_basic

Performance

Metric flashQ (Rust SDK) Redis/BullMQ
Push (batch) ~1.9M jobs/sec ~50K jobs/sec
Processing ~280K jobs/sec ~15K jobs/sec
Wire size 40% smaller (MessagePack) JSON only

Resources

License

MIT

Dependencies

~11–25MB
~234K SLoC