Skip to content

Migrate from BullMQ

BullMQ is a popular Redis-based job queue for Node.js. If you have been using BullMQ, many OJS concepts will feel familiar, but there are some important differences in data format, priority conventions, and architecture. This guide maps BullMQ concepts to OJS equivalents and walks through a practical migration.

BullMQOJSNotes
QueueOJS queue (server-managed)BullMQ queues are client-side objects. OJS queues are server-managed.
Queue.add(name, data)client.enqueue(type, args)BullMQ uses data (object). OJS uses args (array).
Queue.addBulk(jobs)client.enqueueBatch(jobs)Both support atomic batch insertion
WorkerOJSWorkerSame concept: polls for jobs, runs handlers
Worker.on('completed', fn)worker.events.on('job.completed', fn)Event-based notifications
JobOJS job envelopeOJS envelopes carry structured metadata
job.datactx.job.args or ctx.argsBullMQ uses an object. OJS uses an array.
job.attemptsMadectx.attemptOJS uses 1-indexed attempt count
FlowProducerclient.workflow(chain(...))OJS has chain, group, and batch primitives
job.opts.priorityoptions.priorityBullMQ: lower = higher priority. OJS: higher = higher priority.
job.opts.delayoptions.delay or options.scheduled_atBoth support delayed execution
job.opts.repeatclient.registerCronJob(...)OJS cron is a server-side concept
Bull-specific Redis structuresAny OJS backendOJS is backend-agnostic
@bull-boardOJS dashboard (or query API)OJS exposes structured queue stats
Processor functionHandler registered by typeBullMQ uses queue-scoped processors. OJS uses type-scoped handlers.

BullMQ:

import { Queue } from 'bullmq';
const emailQueue = new Queue('email');
await emailQueue.add('send-welcome', {
to: 'user@example.com',
template: 'welcome',
});
// With options
await emailQueue.add('send-welcome', {
to: 'user@example.com',
template: 'welcome',
}, {
priority: 1, // Lower number = higher priority in BullMQ
delay: 5000, // Delay in milliseconds
attempts: 5,
backoff: { type: 'exponential', delay: 1000 },
});

OJS:

import { OJSClient } from '@openjobspec/sdk';
const client = new OJSClient({ url: 'http://localhost:8080' });
await client.enqueue('email.send', ['user@example.com', 'welcome']);
// With options
await client.enqueue('email.send', ['user@example.com', 'welcome'], {
queue: 'email',
priority: 3, // Higher number = higher priority in OJS
delay: 5000,
retry: {
maxAttempts: 5,
initialInterval: 'PT1S',
backoffCoefficient: 2.0,
},
});

BullMQ:

import { Worker } from 'bullmq';
const worker = new Worker('email', async (job) => {
const { to, template } = job.data;
await sendEmail(to, template);
return { messageId: '...' };
}, {
concurrency: 10,
});
worker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed`);
});
worker.on('failed', (job, err) => {
console.log(`Job ${job?.id} failed: ${err.message}`);
});

OJS:

import { OJSWorker } from '@openjobspec/sdk';
const worker = new OJSWorker({
url: 'http://localhost:8080',
queues: ['email', 'default'],
concurrency: 10,
});
worker.register('email.send', async (ctx) => {
const [to, template] = ctx.job.args;
await sendEmail(to, template);
return { messageId: '...' };
});
worker.events.on('job.completed', (event) => {
console.log(`Job completed: ${event.data.job_type}`);
});
worker.events.on('job.failed', (event) => {
console.log(`Job failed: ${event.data.error.message}`);
});
await worker.start();

BullMQ:

import { FlowProducer } from 'bullmq';
const flowProducer = new FlowProducer();
await flowProducer.add({
name: 'load',
queueName: 'etl',
data: { dest: 'warehouse' },
children: [
{
name: 'transform',
queueName: 'etl',
data: { format: 'csv' },
children: [
{ name: 'fetch', queueName: 'etl', data: { url: '...' } },
],
},
],
});

OJS:

import { OJSClient, chain } from '@openjobspec/sdk';
const client = new OJSClient({ url: 'http://localhost:8080' });
await client.workflow(
chain(
{ type: 'data.fetch', args: { url: '...' } },
{ type: 'data.transform', args: { format: 'csv' } },
{ type: 'data.load', args: { dest: 'warehouse' } },
)
);

OJS chain reads top-to-bottom (step 1, then step 2, then step 3), while BullMQ’s FlowProducer uses a tree where children execute before parents. The OJS approach is simpler to read for sequential pipelines.

This is the biggest change. BullMQ jobs carry a data object (a plain JavaScript object). OJS jobs carry an args array (a JSON array of simple values).

BullMQ:

// data is an object
await queue.add('email.send', { to: 'user@example.com', template: 'welcome' });
// In the handler:
const { to, template } = job.data;

OJS:

// args is an array
await client.enqueue('email.send', ['user@example.com', 'welcome']);
// In the handler:
const [to, template] = ctx.job.args;

Why arrays? This constraint (borrowed from Sidekiq) forces clean separation between job data and application state. It prevents developers from passing complex objects that couple the job to a specific runtime. It also ensures cross-language compatibility, since every language can handle JSON arrays.

If you have complex arguments, you can pass an object as a single array element:

await client.enqueue('report.generate', [{ userId: 42, format: 'pdf', filters: { year: 2026 } }]);

BullMQ uses lower numbers for higher priority (0 is the highest). OJS uses higher numbers for higher priority, which matches human intuition.

Priority LevelBullMQOJS
High13 (or HIGH)
Normal52 (or NORMAL)
Low101 (or LOW)

When migrating, invert your priority values.

BullMQ communicates directly with Redis. Each Queue and Worker instance maintains its own Redis connection and uses Bull-specific Redis data structures (sorted sets, streams, Lua scripts).

OJS places a server between your application code and the storage layer. Your SDK talks to the OJS server via HTTP, and the server handles Redis (or PostgreSQL, or any other backend) internally.

BullMQ: App -> Redis (directly)
OJS: App -> OJS Server (HTTP) -> Redis/PostgreSQL

This adds a network hop, but gives you:

  • Backend portability (switch Redis to PostgreSQL without changing app code)
  • Language-agnostic access (any HTTP client can enqueue jobs)
  • Server-side intelligence (retry decisions, scheduling, and state management happen in one place)

BullMQ processors are scoped to a queue. You create a Worker('email', processor) that handles all jobs in the email queue.

OJS handlers are scoped to a job type. You register worker.register('email.send', handler) and worker.register('email.verify', handler) as separate handlers. The worker polls from one or more queues, but dispatch happens by type.

This is more flexible. You can have multiple job types in the same queue, each with its own handler.

docker-compose.yml
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
ojs-server:
image: ghcr.io/openjobspec/ojs-backend-redis:latest
ports:
- "8080:8080"
environment:
REDIS_URL: redis://redis:6379
depends_on:
- redis
Terminal window
docker compose up -d
curl http://localhost:8080/ojs/v1/health
# {"status":"ok"}

OJS uses its own Redis key namespace, so it can share a Redis instance with BullMQ during migration.

Terminal window
npm install @openjobspec/sdk
src/ojs.ts
import { OJSClient } from '@openjobspec/sdk';
export const ojsClient = new OJSClient({
url: process.env.OJS_URL || 'http://localhost:8080',
});

Migrate one job type at a time. For each BullMQ job:

  1. Choose an OJS job type name (use dot-namespace convention: email.send, report.generate).
  2. Convert the data object to an args array.
  3. Map BullMQ job options to OJS enqueue options.

Before:

await emailQueue.add('send-welcome', {
to: 'user@example.com',
template: 'welcome',
userId: 42,
}, {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
priority: 1,
});

After:

await ojsClient.enqueue('email.welcome', ['user@example.com', 'welcome', 42], {
queue: 'email',
retry: {
maxAttempts: 3,
initialInterval: 'PT2S',
backoffCoefficient: 2.0,
},
priority: 3, // Inverted from BullMQ's 1
});

Before:

const worker = new Worker('email', async (job) => {
switch (job.name) {
case 'send-welcome':
return handleWelcome(job.data);
case 'send-reset':
return handleReset(job.data);
}
});

After:

const worker = new OJSWorker({
url: process.env.OJS_URL || 'http://localhost:8080',
queues: ['email', 'default'],
concurrency: 10,
});
worker.register('email.welcome', async (ctx) => {
const [to, template, userId] = ctx.job.args;
return handleWelcome({ to, template, userId });
});
worker.register('email.reset', async (ctx) => {
const [to, userId] = ctx.job.args;
return handleReset({ to, userId });
});
// Graceful shutdown
process.on('SIGINT', () => worker.stop());
process.on('SIGTERM', () => worker.stop());
await worker.start();

BullMQ event handlers:

worker.on('active', (job) => {
console.log(`Starting ${job.name}`);
});
worker.on('completed', (job) => {
console.log(`Completed ${job.name}`);
});

OJS middleware (more powerful):

worker.use(async (ctx, next) => {
console.log(`Starting ${ctx.job.type} attempt ${ctx.attempt}`);
const start = Date.now();
try {
await next();
console.log(`Completed ${ctx.job.type} in ${Date.now() - start}ms`);
} catch (err) {
console.error(`Failed ${ctx.job.type}: ${(err as Error).message}`);
throw err;
}
});

BullMQ repeatable:

await queue.add('daily-cleanup', {}, {
repeat: { pattern: '0 3 * * *' },
});

OJS cron:

Terminal window
curl -X POST http://localhost:8080/ojs/v1/cron \
-H "Content-Type: application/json" \
-d '{
"name": "daily-cleanup",
"cron": "0 3 * * *",
"timezone": "UTC",
"type": "cleanup.daily",
"args": []
}'

OJS cron jobs are registered on the server, not embedded in client code. This means they survive application restarts without re-registration.

During migration, run both BullMQ and OJS workers side by side. Migrate producer code (enqueue calls) first, then consumer code (workers), one job type at a time.

Once all job types are migrated:

  1. Remove bullmq from package.json.
  2. Remove BullMQ Queue, Worker, and FlowProducer instances.
  3. Remove Bull-specific Redis connection configuration.
  4. Clean up any BullMQ dashboard integrations.
  • Backend portability. Switch from Redis to PostgreSQL without changing your application code. BullMQ is Redis-only. OJS supports any conforming backend.
  • Cross-language workers. A Python or Go service can process jobs enqueued by your TypeScript app. With BullMQ, you are locked into the Node.js ecosystem.
  • Standardized lifecycle. OJS defines 8 explicit states with documented transitions. BullMQ has implicit states spread across Redis data structures, making debugging harder.
  • Conformance testing. The OJS conformance test suite verifies backend behavior across 5 levels. BullMQ’s behavior is defined by its implementation, not by a testable specification.
  • Structured retry policies. OJS retry policies include exponential backoff, jitter, max interval caps, and non-retryable error classification, all in a standardized format that works across every SDK.
  • Native workflow support. OJS chain, group, and batch are first-class primitives with server-side orchestration. BullMQ’s FlowProducer has similar capabilities but uses a tree structure that can be harder to reason about for sequential pipelines.