4 releases
Uses new Rust 2024
| 0.1.3 | Jun 20, 2025 |
|---|---|
| 0.1.2 | Jun 20, 2025 |
| 0.1.1 | Jun 19, 2025 |
| 0.1.0 | Jun 19, 2025 |
#4 in #retry-policy
86 downloads per month
26KB
454 lines
Dagcuter 🚀
RustDagcuter is a Rust library for executing directed acyclic graphs (DAGs) of tasks. It manages task dependencies, detects cyclic dependencies, and supports customizable task lifecycles (pre-execution, post-execution). It also supports concurrent execution of independent tasks to improve performance.
✨ Core functions
- Intelligent dependency management: Automatically parse and schedule multi-task dependencies.
- Loop detection: Real-time discovery and prevention of loop dependencies.
- High concurrent execution: Topological sorting drives parallel operation, making full use of multi-cores.
- Exponential backoff retry: Built-in configurable retry strategy; supports custom intervals, multiples and maximum times.
- Graceful cancellation: Supports mid-way cancellation and resource release.
- Execution tracking: Real-time printing of task status and execution order.
- Type safety: Static type guarantee, compile-time error checking.
- Zero cost abstraction: Minimal runtime overhead.
- Life cycle hook: Custom logic can be inserted before/after task execution.
🏗️ Project structure
dagcuter/
├─ src/
│ ├─ lib.rs # Core exports and type definitions
│ ├─ task.rs # Task features and hooks
│ ├─ retry.rs # Retry strategy
│ ├─ cycle_check.rs # Cycle detection algorithm
│ └─ executor.rs # Executor core
├─ examples/ # Example code
├─ Cargo.toml
└─ README.md
🚀 Quick start
- Add dependencies in
Cargo.toml:
[dependencies]
dagcuter = "0.1.1"
tokio = { version = "1.0", features = ["full"] }
async-trait = "0.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
futures = "0.3"
tokio-util = "0.7"
- Write the task and execute it:
use dagcuter::*;
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use std::{collections::HashMap, sync::Arc};
struct ExampleTask {
name: String,
deps: Vec<String>,
}
#[async_trait]
impl Task for ExampleTask {
fn name(&self) -> &str { &self.name }
fn dependencies(&self) -> Vec<String> { self.deps.clone() }
fn retry_policy(&self) -> Option<RetryPolicy> {
Some(RetryPolicy { max_attempts: 3, ..Default::default() })
}
async fn execute(
&self,
_ctx: CancellationToken,
_input: &TaskInput,
) -> Result<TaskResult, DagcuterError> {
println!("Execute task: {}", self.name);
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let mut out = HashMap::new();
out.insert("status".into(), serde_json::json!("ok"));
Ok(out)
}
}
#[tokio::main]
async fn main() {
let mut tasks: HashMap<String, BoxTask> = HashMap::new();
tasks.insert("A".into(), Arc::new(ExampleTask { name: "A".into(), deps: vec![] }));
tasks.insert("B".into(), Arc::new(ExampleTask { name: "B".into(), deps: vec!["A".into()] }));
let mut engine = Dagcuter::new(tasks).unwrap();
let ctx = CancellationToken::new();
println!("=== dependency graph ===");
engine.print_graph();
println!("=== Start execution ===");
let results = engine.execute(ctx.clone()).await.unwrap();
println!("=== completion: {:?} ===", results);
} ```
3. Run the example:
```bash
cargo run
📚 API Overview
Task attribute
#[async_trait]
pub trait Task: Send + Sync {
fn name(&self) -> &str;
fn dependencies(&self) -> Vec<String>;
fn retry_policy(&self) -> Option<RetryPolicy> { None }
async fn pre_execution(&self, _ctx: CancellationToken, _input: &TaskInput) -> Result<(), DagcuterError> { Ok(()) }
async fn execute(&self, ctx: CancellationToken, input: &TaskInput) -> Result<TaskResult, DagcuterError>;
async fn post_execution(&self, _ctx: CancellationToken, _output: &TaskResult) -> Result<(), DagcuterError> { Ok(()) }
}
RetryPolicy
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct RetryPolicy {
pub interval: Duration,
pub max_interval: Duration,
pub max_attempts: i32,
pub multiplier: f64,
}
impl Default for RetryPolicy {
fn default() -> Self { /* 1s, 30s, -1, 2.0 */ }
}
Dagcuter
impl Dagcuter {
pub fn new(tasks: HashMap<String, BoxTask>) -> Result<Self, DagcuterError>;
pub async fn execute(&mut self, ctx: CancellationToken) -> Result<HashMap<String, TaskResult>, DagcuterError>;
pub async fn execution_order(&self) -> Vec<String>;
pub fn print_graph(&self);
}
🔧 Advanced usage
-
Custom retry: adjust
interval,multiplier,max_attempts -
Lifecycle hook: override
pre_execution/post_execution -
Cancellation and timeout: combine
CancellationTokento control execution -
Complex data flow: process
TaskInputinexecuteand return a customTaskResult
📝 License
This project adopts the MIT protocol, see LICENSE for details.
Dependencies
~4–6MB
~102K SLoC