Migrate from BullMQ
BullMQ is a popular Redis-based job queue for Node.js. If you have been using BullMQ, many OJS concepts will feel familiar, but there are some important differences in data format, priority conventions, and architecture. This guide maps BullMQ concepts to OJS equivalents and walks through a practical migration.
Concept mapping
Section titled “Concept mapping”| BullMQ | OJS | Notes |
|---|---|---|
Queue | OJS queue (server-managed) | BullMQ queues are client-side objects. OJS queues are server-managed. |
Queue.add(name, data) | client.enqueue(type, args) | BullMQ uses data (object). OJS uses args (array). |
Queue.addBulk(jobs) | client.enqueueBatch(jobs) | Both support atomic batch insertion |
Worker | OJSWorker | Same concept: polls for jobs, runs handlers |
Worker.on('completed', fn) | worker.events.on('job.completed', fn) | Event-based notifications |
Job | OJS job envelope | OJS envelopes carry structured metadata |
job.data | ctx.job.args or ctx.args | BullMQ uses an object. OJS uses an array. |
job.attemptsMade | ctx.attempt | OJS uses 1-indexed attempt count |
FlowProducer | client.workflow(chain(...)) | OJS has chain, group, and batch primitives |
job.opts.priority | options.priority | BullMQ: lower = higher priority. OJS: higher = higher priority. |
job.opts.delay | options.delay or options.scheduled_at | Both support delayed execution |
job.opts.repeat | client.registerCronJob(...) | OJS cron is a server-side concept |
| Bull-specific Redis structures | Any OJS backend | OJS is backend-agnostic |
@bull-board | OJS dashboard (or query API) | OJS exposes structured queue stats |
Processor function | Handler registered by type | BullMQ uses queue-scoped processors. OJS uses type-scoped handlers. |
Code comparison
Section titled “Code comparison”Enqueuing jobs
Section titled “Enqueuing jobs”BullMQ:
import { Queue } from 'bullmq';
const emailQueue = new Queue('email');
await emailQueue.add('send-welcome', { to: 'user@example.com', template: 'welcome',});
// With optionsawait emailQueue.add('send-welcome', { to: 'user@example.com', template: 'welcome',}, { priority: 1, // Lower number = higher priority in BullMQ delay: 5000, // Delay in milliseconds attempts: 5, backoff: { type: 'exponential', delay: 1000 },});OJS:
import { OJSClient } from '@openjobspec/sdk';
const client = new OJSClient({ url: 'http://localhost:8080' });
await client.enqueue('email.send', ['user@example.com', 'welcome']);
// With optionsawait client.enqueue('email.send', ['user@example.com', 'welcome'], { queue: 'email', priority: 3, // Higher number = higher priority in OJS delay: 5000, retry: { maxAttempts: 5, initialInterval: 'PT1S', backoffCoefficient: 2.0, },});Processing jobs
Section titled “Processing jobs”BullMQ:
import { Worker } from 'bullmq';
const worker = new Worker('email', async (job) => { const { to, template } = job.data; await sendEmail(to, template); return { messageId: '...' };}, { concurrency: 10,});
worker.on('completed', (job, result) => { console.log(`Job ${job.id} completed`);});
worker.on('failed', (job, err) => { console.log(`Job ${job?.id} failed: ${err.message}`);});OJS:
import { OJSWorker } from '@openjobspec/sdk';
const worker = new OJSWorker({ url: 'http://localhost:8080', queues: ['email', 'default'], concurrency: 10,});
worker.register('email.send', async (ctx) => { const [to, template] = ctx.job.args; await sendEmail(to, template); return { messageId: '...' };});
worker.events.on('job.completed', (event) => { console.log(`Job completed: ${event.data.job_type}`);});
worker.events.on('job.failed', (event) => { console.log(`Job failed: ${event.data.error.message}`);});
await worker.start();Workflows (FlowProducer)
Section titled “Workflows (FlowProducer)”BullMQ:
import { FlowProducer } from 'bullmq';
const flowProducer = new FlowProducer();
await flowProducer.add({ name: 'load', queueName: 'etl', data: { dest: 'warehouse' }, children: [ { name: 'transform', queueName: 'etl', data: { format: 'csv' }, children: [ { name: 'fetch', queueName: 'etl', data: { url: '...' } }, ], }, ],});OJS:
import { OJSClient, chain } from '@openjobspec/sdk';
const client = new OJSClient({ url: 'http://localhost:8080' });
await client.workflow( chain( { type: 'data.fetch', args: { url: '...' } }, { type: 'data.transform', args: { format: 'csv' } }, { type: 'data.load', args: { dest: 'warehouse' } }, ));OJS chain reads top-to-bottom (step 1, then step 2, then step 3), while BullMQ’s FlowProducer uses a tree where children execute before parents. The OJS approach is simpler to read for sequential pipelines.
Key differences
Section titled “Key differences”Data format: object vs. array
Section titled “Data format: object vs. array”This is the biggest change. BullMQ jobs carry a data object (a plain JavaScript object). OJS jobs carry an args array (a JSON array of simple values).
BullMQ:
// data is an objectawait queue.add('email.send', { to: 'user@example.com', template: 'welcome' });
// In the handler:const { to, template } = job.data;OJS:
// args is an arrayawait client.enqueue('email.send', ['user@example.com', 'welcome']);
// In the handler:const [to, template] = ctx.job.args;Why arrays? This constraint (borrowed from Sidekiq) forces clean separation between job data and application state. It prevents developers from passing complex objects that couple the job to a specific runtime. It also ensures cross-language compatibility, since every language can handle JSON arrays.
If you have complex arguments, you can pass an object as a single array element:
await client.enqueue('report.generate', [{ userId: 42, format: 'pdf', filters: { year: 2026 } }]);Priority: inverted convention
Section titled “Priority: inverted convention”BullMQ uses lower numbers for higher priority (0 is the highest). OJS uses higher numbers for higher priority, which matches human intuition.
| Priority Level | BullMQ | OJS |
|---|---|---|
| High | 1 | 3 (or HIGH) |
| Normal | 5 | 2 (or NORMAL) |
| Low | 10 | 1 (or LOW) |
When migrating, invert your priority values.
Architecture: client-side vs. server-side
Section titled “Architecture: client-side vs. server-side”BullMQ communicates directly with Redis. Each Queue and Worker instance maintains its own Redis connection and uses Bull-specific Redis data structures (sorted sets, streams, Lua scripts).
OJS places a server between your application code and the storage layer. Your SDK talks to the OJS server via HTTP, and the server handles Redis (or PostgreSQL, or any other backend) internally.
BullMQ: App -> Redis (directly)OJS: App -> OJS Server (HTTP) -> Redis/PostgreSQLThis adds a network hop, but gives you:
- Backend portability (switch Redis to PostgreSQL without changing app code)
- Language-agnostic access (any HTTP client can enqueue jobs)
- Server-side intelligence (retry decisions, scheduling, and state management happen in one place)
Queue scoping vs. type scoping
Section titled “Queue scoping vs. type scoping”BullMQ processors are scoped to a queue. You create a Worker('email', processor) that handles all jobs in the email queue.
OJS handlers are scoped to a job type. You register worker.register('email.send', handler) and worker.register('email.verify', handler) as separate handlers. The worker polls from one or more queues, but dispatch happens by type.
This is more flexible. You can have multiple job types in the same queue, each with its own handler.
Step-by-step migration
Section titled “Step-by-step migration”Step 1: Set up the OJS server
Section titled “Step 1: Set up the OJS server”services: redis: image: redis:7-alpine ports: - "6379:6379"
ojs-server: image: ghcr.io/openjobspec/ojs-backend-redis:latest ports: - "8080:8080" environment: REDIS_URL: redis://redis:6379 depends_on: - redisdocker compose up -dcurl http://localhost:8080/ojs/v1/health# {"status":"ok"}OJS uses its own Redis key namespace, so it can share a Redis instance with BullMQ during migration.
Step 2: Install the OJS SDK
Section titled “Step 2: Install the OJS SDK”npm install @openjobspec/sdkStep 3: Create an OJS client
Section titled “Step 3: Create an OJS client”import { OJSClient } from '@openjobspec/sdk';
export const ojsClient = new OJSClient({ url: process.env.OJS_URL || 'http://localhost:8080',});Step 4: Convert job definitions
Section titled “Step 4: Convert job definitions”Migrate one job type at a time. For each BullMQ job:
- Choose an OJS job type name (use dot-namespace convention:
email.send,report.generate). - Convert the data object to an args array.
- Map BullMQ job options to OJS enqueue options.
Before:
await emailQueue.add('send-welcome', { to: 'user@example.com', template: 'welcome', userId: 42,}, { attempts: 3, backoff: { type: 'exponential', delay: 2000 }, priority: 1,});After:
await ojsClient.enqueue('email.welcome', ['user@example.com', 'welcome', 42], { queue: 'email', retry: { maxAttempts: 3, initialInterval: 'PT2S', backoffCoefficient: 2.0, }, priority: 3, // Inverted from BullMQ's 1});Step 5: Convert workers
Section titled “Step 5: Convert workers”Before:
const worker = new Worker('email', async (job) => { switch (job.name) { case 'send-welcome': return handleWelcome(job.data); case 'send-reset': return handleReset(job.data); }});After:
const worker = new OJSWorker({ url: process.env.OJS_URL || 'http://localhost:8080', queues: ['email', 'default'], concurrency: 10,});
worker.register('email.welcome', async (ctx) => { const [to, template, userId] = ctx.job.args; return handleWelcome({ to, template, userId });});
worker.register('email.reset', async (ctx) => { const [to, userId] = ctx.job.args; return handleReset({ to, userId });});
// Graceful shutdownprocess.on('SIGINT', () => worker.stop());process.on('SIGTERM', () => worker.stop());
await worker.start();Step 6: Add middleware
Section titled “Step 6: Add middleware”BullMQ event handlers:
worker.on('active', (job) => { console.log(`Starting ${job.name}`);});worker.on('completed', (job) => { console.log(`Completed ${job.name}`);});OJS middleware (more powerful):
worker.use(async (ctx, next) => { console.log(`Starting ${ctx.job.type} attempt ${ctx.attempt}`); const start = Date.now(); try { await next(); console.log(`Completed ${ctx.job.type} in ${Date.now() - start}ms`); } catch (err) { console.error(`Failed ${ctx.job.type}: ${(err as Error).message}`); throw err; }});Step 7: Convert repeatable jobs
Section titled “Step 7: Convert repeatable jobs”BullMQ repeatable:
await queue.add('daily-cleanup', {}, { repeat: { pattern: '0 3 * * *' },});OJS cron:
curl -X POST http://localhost:8080/ojs/v1/cron \ -H "Content-Type: application/json" \ -d '{ "name": "daily-cleanup", "cron": "0 3 * * *", "timezone": "UTC", "type": "cleanup.daily", "args": [] }'OJS cron jobs are registered on the server, not embedded in client code. This means they survive application restarts without re-registration.
Step 8: Run in parallel
Section titled “Step 8: Run in parallel”During migration, run both BullMQ and OJS workers side by side. Migrate producer code (enqueue calls) first, then consumer code (workers), one job type at a time.
Step 9: Cutover and remove BullMQ
Section titled “Step 9: Cutover and remove BullMQ”Once all job types are migrated:
- Remove
bullmqfrompackage.json. - Remove BullMQ Queue, Worker, and FlowProducer instances.
- Remove Bull-specific Redis connection configuration.
- Clean up any BullMQ dashboard integrations.
What you gain
Section titled “What you gain”- Backend portability. Switch from Redis to PostgreSQL without changing your application code. BullMQ is Redis-only. OJS supports any conforming backend.
- Cross-language workers. A Python or Go service can process jobs enqueued by your TypeScript app. With BullMQ, you are locked into the Node.js ecosystem.
- Standardized lifecycle. OJS defines 8 explicit states with documented transitions. BullMQ has implicit states spread across Redis data structures, making debugging harder.
- Conformance testing. The OJS conformance test suite verifies backend behavior across 5 levels. BullMQ’s behavior is defined by its implementation, not by a testable specification.
- Structured retry policies. OJS retry policies include exponential backoff, jitter, max interval caps, and non-retryable error classification, all in a standardized format that works across every SDK.
- Native workflow support. OJS chain, group, and batch are first-class primitives with server-side orchestration. BullMQ’s FlowProducer has similar capabilities but uses a tree structure that can be harder to reason about for sequential pipelines.