Tutorial: Your First Job in Rust
This tutorial walks you through building a background job system with the Rust SDK. You will enqueue, process, and monitor jobs using async Rust with tokio — type-safe and blazing fast.
Prerequisites
Section titled “Prerequisites”- Docker and Docker Compose
- Rust 1.75 or later
- A running OJS server (see Quickstart Step 1)
Step 1: Initialize the project
Section titled “Step 1: Initialize the project”cargo new ojs-rust-tutorial && cd ojs-rust-tutorialAdd dependencies to Cargo.toml:
[dependencies]ojs = "0.1"tokio = { version = "1", features = ["full"] }serde_json = "1"Step 2: Enqueue a job
Section titled “Step 2: Enqueue a job”Replace src/main.rs:
use ojs::Client;use serde_json::json;
#[tokio::main]async fn main() -> ojs::Result<()> { let client = Client::builder() .url("http://localhost:8080") .build()?;
// Enqueue a job of type "email.send" on the "default" queue let job = client .enqueue("email.send", json!({"to": "user@example.com", "template": "welcome"})) .await?;
println!("Enqueued job {} in state: {}", job.id, job.state); Ok(())}Run it:
cargo runYou should see:
Enqueued job 019461a8-1a2b-7c3d-8e4f-5a6b7c8d9e0f in state: availableStep 3: Build a worker
Section titled “Step 3: Build a worker”Create src/bin/worker.rs:
use ojs::{Worker, JobContext};use serde_json::json;
#[tokio::main]async fn main() -> ojs::Result<()> { // Create a worker that polls the "default" queue let worker = Worker::builder() .url("http://localhost:8080") .queues(vec!["default"]) .concurrency(5) .build()?;
// Register a handler for "email.send" jobs worker.register("email.send", |ctx: JobContext| async move { let to: String = ctx.job.arg("to")?; let template: String = ctx.job.arg("template")?; println!("Sending '{}' email to {}", template, to);
// Your email logic goes here Ok(json!({"delivered": true})) }).await;
println!("Worker started, waiting for jobs..."); worker.start().await?; Ok(())}Run the worker:
cargo run --bin workerOutput:
Worker started, waiting for jobs...Sending 'welcome' email to user@example.comStep 4: Add retry logic
Section titled “Step 4: Add retry logic”Modify the enqueue call to add a retry policy:
use ojs::{Client, RetryPolicy};use serde_json::json;
let job = client .enqueue("email.send", json!({"to": "user@example.com", "template": "welcome"})) .queue("default") .retry(RetryPolicy::new().max_attempts(5)) .send() .await?;
println!("Enqueued job {} with retries", job.id);If the worker handler returns an Err, the job transitions to retryable and is automatically rescheduled with exponential backoff.
Step 5: Error handling
Section titled “Step 5: Error handling”The SDK provides structured errors via OjsError:
use ojs::{Client, OjsError};use serde_json::json;
let client = Client::builder() .url("http://localhost:8080") .build()?;
match client.enqueue("email.send", json!({"to": "user@example.com"})).await { Ok(job) => println!("Enqueued: {}", job.id), Err(OjsError::Conflict { existing_job_id, .. }) => { println!("Duplicate job: {}", existing_job_id); } Err(OjsError::RateLimit { retry_after, .. }) => { println!("Rate limited, retry after {:?}", retry_after); } Err(e) => eprintln!("Error: {}", e),}Step 6: Add middleware
Section titled “Step 6: Add middleware”Use the Tower-inspired middleware pattern for cross-cutting concerns:
use ojs::{Worker, Middleware, Next, JobContext, BoxFuture, HandlerResult};use serde_json::json;
struct LoggingMiddleware;
impl Middleware for LoggingMiddleware { fn handle(&self, ctx: JobContext, next: Next) -> BoxFuture<'static, HandlerResult> { Box::pin(async move { let start = std::time::Instant::now(); println!("[START] {} ({})", ctx.job.job_type, ctx.job.id);
let result = next.run(ctx).await;
match &result { Ok(_) => println!("[DONE] processed in {:?}", start.elapsed()), Err(e) => println!("[FAIL] after {:?}: {}", start.elapsed(), e), }
result }) }}
struct RecoveryMiddleware;
impl Middleware for RecoveryMiddleware { fn handle(&self, ctx: JobContext, next: Next) -> BoxFuture<'static, HandlerResult> { Box::pin(async move { match next.run(ctx).await { Ok(val) => Ok(val), Err(e) => { eprintln!("Job failed, captured for recovery: {}", e); Err(e) } } }) }}
#[tokio::main]async fn main() -> ojs::Result<()> { let worker = Worker::builder() .url("http://localhost:8080") .queues(vec!["default"]) .concurrency(5) .build()?;
// Add middleware in order (outermost first) worker.use_middleware("logging", LoggingMiddleware).await; worker.use_middleware("recovery", RecoveryMiddleware).await;
worker.register("email.send", |ctx: JobContext| async move { let to: String = ctx.job.arg("to")?; println!(" Sending email to {}", to); Ok(json!({"delivered": true})) }).await;
println!("Worker started with middleware, waiting for jobs..."); worker.start().await}Step 7: Use workflows
Section titled “Step 7: Use workflows”Create workflows with chain (sequential) and group (parallel) primitives:
use ojs::{Client, chain, group, batch, Step, BatchCallbacks};use serde_json::json;
#[tokio::main]async fn main() -> ojs::Result<()> { let client = Client::builder() .url("http://localhost:8080") .build()?;
// Chain: sequential execution (A → B → C) let workflow = client.create_workflow( chain(vec![ Step::new("order.validate", json!({"order_id": "ord_123"})), Step::new("payment.charge", json!({})), Step::new("notification.send", json!({})), ]).name("order-processing") ).await?; println!("Chain workflow: {}", workflow.id);
// Group: parallel execution let workflow = client.create_workflow( group(vec![ Step::new("export.csv", json!({"report_id": "rpt_456"})), Step::new("export.pdf", json!({"report_id": "rpt_456"})), ]).name("multi-export") ).await?; println!("Group workflow: {}", workflow.id);
// Batch: parallel with callbacks let workflow = client.create_workflow( batch( BatchCallbacks::new() .on_complete(Step::new("batch.report", json!({}))), vec![ Step::new("email.send", json!({"to": "user1@example.com"})), Step::new("email.send", json!({"to": "user2@example.com"})), ], ).name("bulk-email") ).await?; println!("Batch workflow: {}", workflow.id);
Ok(())}What you built
Section titled “What you built”- An async Rust client that enqueues jobs to an OJS server
- An async worker with tokio-based concurrency and graceful shutdown
- Retry policies for automatic failure recovery
- Structured error handling with
OjsErrorvariants - Tower-inspired middleware for logging and recovery
- Workflows with chain, group, and batch orchestration
Next steps
Section titled “Next steps”- Add workflow orchestration with advanced patterns
- Explore scheduled jobs for delayed and cron execution
- Use unique jobs for deduplication
- Read the Rust SDK docs for the full API reference