Tutorial: Workflow Patterns
OJS provides three workflow primitives for composing jobs into pipelines. This tutorial demonstrates each pattern with practical examples.
Prerequisites
Section titled “Prerequisites”- A running OJS server (see Quickstart)
- Any OJS SDK installed (examples shown in JavaScript; patterns are identical across all SDKs)
Pattern 1: Chain — Sequential Execution
Section titled “Pattern 1: Chain — Sequential Execution”A chain runs jobs one after another. Each job starts only after the previous one completes. The output of one job is available to the next.
Use case: Multi-step processing where each step depends on the previous result.
import { OJSClient } from '@openjobspec/sdk';
const client = new OJSClient({ url: 'http://localhost:8080' });
// Process an order: validate → charge → ship → notifyconst workflow = await client.workflow.chain([ { type: 'order.validate', args: ['order_123'] }, { type: 'payment.charge', args: ['order_123', 99.99] }, { type: 'shipping.create', args: ['order_123'] }, { type: 'email.send', args: ['customer@example.com', 'order_shipped'] },]);
console.log(`Workflow ${workflow.id} started`);console.log(`Jobs: ${workflow.jobs.map((j) => j.type).join(' → ')}`);How it works:
order.validateruns immediately- When it completes,
payment.chargebecomes available - When that completes,
shipping.createruns - Finally,
email.sendnotifies the customer
If any job fails (after retries), the chain stops. Remaining jobs transition to cancelled.
Pattern 2: Group — Parallel Fan-Out/Fan-In
Section titled “Pattern 2: Group — Parallel Fan-Out/Fan-In”A group runs multiple jobs in parallel and waits for all of them to complete before continuing.
Use case: Independent tasks that can execute concurrently, with a synchronization point.
// Generate a report from multiple data sources in parallelconst workflow = await client.workflow.chain([ // Step 1: Kick off data collection { type: 'report.init', args: ['monthly_report'] },
// Step 2: Collect from 3 sources in parallel (fan-out) client.workflow.group([ { type: 'data.fetch', args: ['sales_db', 'monthly_report'] }, { type: 'data.fetch', args: ['analytics_api', 'monthly_report'] }, { type: 'data.fetch', args: ['crm_export', 'monthly_report'] }, ]),
// Step 3: Merge results after all sources complete (fan-in) { type: 'report.compile', args: ['monthly_report'] }, { type: 'report.email', args: ['team@example.com', 'monthly_report'] },]);
console.log(`Report workflow ${workflow.id} started`);How it works:
report.initruns first (chain step 1)- All three
data.fetchjobs run in parallel (group in chain step 2) - When all three complete,
report.compileruns (chain step 3) - Finally,
report.emailsends the result
Pattern 3: Batch — Completion Callbacks
Section titled “Pattern 3: Batch — Completion Callbacks”A batch groups jobs together and fires callbacks when jobs reach terminal states. Unlike a group, batch jobs are independent — one failing doesn’t affect others.
Use case: Bulk operations with progress tracking and completion notifications.
// Send onboarding emails to 1000 users with progress trackingconst batch = await client.workflow.batch( // Jobs to execute Array.from({ length: 1000 }, (_, i) => ({ type: 'email.send', args: [`user${i}@example.com`, 'onboarding'], })), // Callbacks { onComplete: { type: 'batch.report', args: ['onboarding_complete'] }, onDiscard: { type: 'batch.alert', args: ['onboarding_failures'] }, },);
console.log(`Batch ${batch.id}: ${batch.jobs.length} jobs`);Callback triggers:
onComplete: Fires when all jobs reach a terminal state (completed, cancelled, or discarded)onDiscard: Fires when any job is discarded (exhausted retries)
Combining Patterns
Section titled “Combining Patterns”Workflows can be nested arbitrarily:
// Complex ETL pipelineconst workflow = await client.workflow.chain([ // Step 1: Setup { type: 'etl.setup', args: ['pipeline_42'] },
// Step 2: Extract from multiple sources (parallel) client.workflow.group([ // Each source extraction is itself a chain client.workflow.chain([ { type: 'extract.connect', args: ['postgres'] }, { type: 'extract.query', args: ['postgres', 'SELECT * FROM orders'] }, { type: 'extract.disconnect', args: ['postgres'] }, ]), client.workflow.chain([ { type: 'extract.connect', args: ['mongodb'] }, { type: 'extract.query', args: ['mongodb', 'db.events.find()'] }, { type: 'extract.disconnect', args: ['mongodb'] }, ]), ]),
// Step 3: Transform and load { type: 'transform.merge', args: ['pipeline_42'] }, { type: 'load.warehouse', args: ['pipeline_42', 'bigquery'] },
// Step 4: Notify { type: 'notify.slack', args: ['#data-team', 'ETL pipeline_42 complete'] },]);Monitoring Workflows
Section titled “Monitoring Workflows”Check the status of a workflow and its jobs:
const status = await client.workflow.get(workflow.id);
console.log(`Workflow: ${status.state}`);for (const job of status.jobs) { console.log(` ${job.type}: ${job.state} (attempt ${job.attempt})`);}Workflow states follow the job lifecycle:
active— at least one job is still runningcompleted— all jobs completed successfullycancelled— the workflow was cancelleddiscarded— one or more jobs failed permanently
Pattern Comparison
Section titled “Pattern Comparison”| Pattern | Execution | Failure behavior | Best for |
|---|---|---|---|
| Chain | Sequential | Stops on failure | Multi-step pipelines |
| Group | Parallel | Waits for all | Fan-out/fan-in |
| Batch | Parallel | Independent | Bulk operations |
Next steps
Section titled “Next steps”- Read the Workflows spec for the full specification
- Add retry policies to individual jobs within workflows
- Use unique jobs to prevent duplicate workflow steps