Skip to content

Tutorial: Your First Job in Rust

This tutorial walks you through building a background job system with the Rust SDK. You will enqueue, process, and monitor jobs using async Rust with tokio — type-safe and blazing fast.

Terminal window
cargo new ojs-rust-tutorial && cd ojs-rust-tutorial

Add dependencies to Cargo.toml:

[dependencies]
ojs = "0.1"
tokio = { version = "1", features = ["full"] }
serde_json = "1"

Replace src/main.rs:

src/main.rs
use ojs::Client;
use serde_json::json;
#[tokio::main]
async fn main() -> ojs::Result<()> {
let client = Client::builder()
.url("http://localhost:8080")
.build()?;
// Enqueue a job of type "email.send" on the "default" queue
let job = client
.enqueue("email.send", json!({"to": "user@example.com", "template": "welcome"}))
.await?;
println!("Enqueued job {} in state: {}", job.id, job.state);
Ok(())
}

Run it:

Terminal window
cargo run

You should see:

Enqueued job 019461a8-1a2b-7c3d-8e4f-5a6b7c8d9e0f in state: available

Create src/bin/worker.rs:

src/bin/worker.rs
use ojs::{Worker, JobContext};
use serde_json::json;
#[tokio::main]
async fn main() -> ojs::Result<()> {
// Create a worker that polls the "default" queue
let worker = Worker::builder()
.url("http://localhost:8080")
.queues(vec!["default"])
.concurrency(5)
.build()?;
// Register a handler for "email.send" jobs
worker.register("email.send", |ctx: JobContext| async move {
let to: String = ctx.job.arg("to")?;
let template: String = ctx.job.arg("template")?;
println!("Sending '{}' email to {}", template, to);
// Your email logic goes here
Ok(json!({"delivered": true}))
}).await;
println!("Worker started, waiting for jobs...");
worker.start().await?;
Ok(())
}

Run the worker:

Terminal window
cargo run --bin worker

Output:

Worker started, waiting for jobs...
Sending 'welcome' email to user@example.com

Modify the enqueue call to add a retry policy:

use ojs::{Client, RetryPolicy};
use serde_json::json;
let job = client
.enqueue("email.send", json!({"to": "user@example.com", "template": "welcome"}))
.queue("default")
.retry(RetryPolicy::new().max_attempts(5))
.send()
.await?;
println!("Enqueued job {} with retries", job.id);

If the worker handler returns an Err, the job transitions to retryable and is automatically rescheduled with exponential backoff.

The SDK provides structured errors via OjsError:

use ojs::{Client, OjsError};
use serde_json::json;
let client = Client::builder()
.url("http://localhost:8080")
.build()?;
match client.enqueue("email.send", json!({"to": "user@example.com"})).await {
Ok(job) => println!("Enqueued: {}", job.id),
Err(OjsError::Conflict { existing_job_id, .. }) => {
println!("Duplicate job: {}", existing_job_id);
}
Err(OjsError::RateLimit { retry_after, .. }) => {
println!("Rate limited, retry after {:?}", retry_after);
}
Err(e) => eprintln!("Error: {}", e),
}

Use the Tower-inspired middleware pattern for cross-cutting concerns:

src/bin/worker_with_middleware.rs
use ojs::{Worker, Middleware, Next, JobContext, BoxFuture, HandlerResult};
use serde_json::json;
struct LoggingMiddleware;
impl Middleware for LoggingMiddleware {
fn handle(&self, ctx: JobContext, next: Next) -> BoxFuture<'static, HandlerResult> {
Box::pin(async move {
let start = std::time::Instant::now();
println!("[START] {} ({})", ctx.job.job_type, ctx.job.id);
let result = next.run(ctx).await;
match &result {
Ok(_) => println!("[DONE] processed in {:?}", start.elapsed()),
Err(e) => println!("[FAIL] after {:?}: {}", start.elapsed(), e),
}
result
})
}
}
struct RecoveryMiddleware;
impl Middleware for RecoveryMiddleware {
fn handle(&self, ctx: JobContext, next: Next) -> BoxFuture<'static, HandlerResult> {
Box::pin(async move {
match next.run(ctx).await {
Ok(val) => Ok(val),
Err(e) => {
eprintln!("Job failed, captured for recovery: {}", e);
Err(e)
}
}
})
}
}
#[tokio::main]
async fn main() -> ojs::Result<()> {
let worker = Worker::builder()
.url("http://localhost:8080")
.queues(vec!["default"])
.concurrency(5)
.build()?;
// Add middleware in order (outermost first)
worker.use_middleware("logging", LoggingMiddleware).await;
worker.use_middleware("recovery", RecoveryMiddleware).await;
worker.register("email.send", |ctx: JobContext| async move {
let to: String = ctx.job.arg("to")?;
println!(" Sending email to {}", to);
Ok(json!({"delivered": true}))
}).await;
println!("Worker started with middleware, waiting for jobs...");
worker.start().await
}

Create workflows with chain (sequential) and group (parallel) primitives:

src/bin/workflows.rs
use ojs::{Client, chain, group, batch, Step, BatchCallbacks};
use serde_json::json;
#[tokio::main]
async fn main() -> ojs::Result<()> {
let client = Client::builder()
.url("http://localhost:8080")
.build()?;
// Chain: sequential execution (A → B → C)
let workflow = client.create_workflow(
chain(vec![
Step::new("order.validate", json!({"order_id": "ord_123"})),
Step::new("payment.charge", json!({})),
Step::new("notification.send", json!({})),
]).name("order-processing")
).await?;
println!("Chain workflow: {}", workflow.id);
// Group: parallel execution
let workflow = client.create_workflow(
group(vec![
Step::new("export.csv", json!({"report_id": "rpt_456"})),
Step::new("export.pdf", json!({"report_id": "rpt_456"})),
]).name("multi-export")
).await?;
println!("Group workflow: {}", workflow.id);
// Batch: parallel with callbacks
let workflow = client.create_workflow(
batch(
BatchCallbacks::new()
.on_complete(Step::new("batch.report", json!({}))),
vec![
Step::new("email.send", json!({"to": "user1@example.com"})),
Step::new("email.send", json!({"to": "user2@example.com"})),
],
).name("bulk-email")
).await?;
println!("Batch workflow: {}", workflow.id);
Ok(())
}
  • An async Rust client that enqueues jobs to an OJS server
  • An async worker with tokio-based concurrency and graceful shutdown
  • Retry policies for automatic failure recovery
  • Structured error handling with OjsError variants
  • Tower-inspired middleware for logging and recovery
  • Workflows with chain, group, and batch orchestration