Skip to content

Tutorial: Workflow Patterns

OJS provides three workflow primitives for composing jobs into pipelines. This tutorial demonstrates each pattern with practical examples.

  • A running OJS server (see Quickstart)
  • Any OJS SDK installed (examples shown in JavaScript; patterns are identical across all SDKs)

A chain runs jobs one after another. Each job starts only after the previous one completes. The output of one job is available to the next.

Use case: Multi-step processing where each step depends on the previous result.

import { OJSClient } from '@openjobspec/sdk';
const client = new OJSClient({ url: 'http://localhost:8080' });
// Process an order: validate → charge → ship → notify
const workflow = await client.workflow.chain([
{ type: 'order.validate', args: ['order_123'] },
{ type: 'payment.charge', args: ['order_123', 99.99] },
{ type: 'shipping.create', args: ['order_123'] },
{ type: 'email.send', args: ['customer@example.com', 'order_shipped'] },
]);
console.log(`Workflow ${workflow.id} started`);
console.log(`Jobs: ${workflow.jobs.map((j) => j.type).join('')}`);

How it works:

  1. order.validate runs immediately
  2. When it completes, payment.charge becomes available
  3. When that completes, shipping.create runs
  4. Finally, email.send notifies the customer

If any job fails (after retries), the chain stops. Remaining jobs transition to cancelled.

Pattern 2: Group — Parallel Fan-Out/Fan-In

Section titled “Pattern 2: Group — Parallel Fan-Out/Fan-In”

A group runs multiple jobs in parallel and waits for all of them to complete before continuing.

Use case: Independent tasks that can execute concurrently, with a synchronization point.

// Generate a report from multiple data sources in parallel
const workflow = await client.workflow.chain([
// Step 1: Kick off data collection
{ type: 'report.init', args: ['monthly_report'] },
// Step 2: Collect from 3 sources in parallel (fan-out)
client.workflow.group([
{ type: 'data.fetch', args: ['sales_db', 'monthly_report'] },
{ type: 'data.fetch', args: ['analytics_api', 'monthly_report'] },
{ type: 'data.fetch', args: ['crm_export', 'monthly_report'] },
]),
// Step 3: Merge results after all sources complete (fan-in)
{ type: 'report.compile', args: ['monthly_report'] },
{ type: 'report.email', args: ['team@example.com', 'monthly_report'] },
]);
console.log(`Report workflow ${workflow.id} started`);

How it works:

  1. report.init runs first (chain step 1)
  2. All three data.fetch jobs run in parallel (group in chain step 2)
  3. When all three complete, report.compile runs (chain step 3)
  4. Finally, report.email sends the result

A batch groups jobs together and fires callbacks when jobs reach terminal states. Unlike a group, batch jobs are independent — one failing doesn’t affect others.

Use case: Bulk operations with progress tracking and completion notifications.

// Send onboarding emails to 1000 users with progress tracking
const batch = await client.workflow.batch(
// Jobs to execute
Array.from({ length: 1000 }, (_, i) => ({
type: 'email.send',
args: [`user${i}@example.com`, 'onboarding'],
})),
// Callbacks
{
onComplete: { type: 'batch.report', args: ['onboarding_complete'] },
onDiscard: { type: 'batch.alert', args: ['onboarding_failures'] },
},
);
console.log(`Batch ${batch.id}: ${batch.jobs.length} jobs`);

Callback triggers:

  • onComplete: Fires when all jobs reach a terminal state (completed, cancelled, or discarded)
  • onDiscard: Fires when any job is discarded (exhausted retries)

Workflows can be nested arbitrarily:

// Complex ETL pipeline
const workflow = await client.workflow.chain([
// Step 1: Setup
{ type: 'etl.setup', args: ['pipeline_42'] },
// Step 2: Extract from multiple sources (parallel)
client.workflow.group([
// Each source extraction is itself a chain
client.workflow.chain([
{ type: 'extract.connect', args: ['postgres'] },
{ type: 'extract.query', args: ['postgres', 'SELECT * FROM orders'] },
{ type: 'extract.disconnect', args: ['postgres'] },
]),
client.workflow.chain([
{ type: 'extract.connect', args: ['mongodb'] },
{ type: 'extract.query', args: ['mongodb', 'db.events.find()'] },
{ type: 'extract.disconnect', args: ['mongodb'] },
]),
]),
// Step 3: Transform and load
{ type: 'transform.merge', args: ['pipeline_42'] },
{ type: 'load.warehouse', args: ['pipeline_42', 'bigquery'] },
// Step 4: Notify
{ type: 'notify.slack', args: ['#data-team', 'ETL pipeline_42 complete'] },
]);

Check the status of a workflow and its jobs:

const status = await client.workflow.get(workflow.id);
console.log(`Workflow: ${status.state}`);
for (const job of status.jobs) {
console.log(` ${job.type}: ${job.state} (attempt ${job.attempt})`);
}

Workflow states follow the job lifecycle:

  • active — at least one job is still running
  • completed — all jobs completed successfully
  • cancelled — the workflow was cancelled
  • discarded — one or more jobs failed permanently
PatternExecutionFailure behaviorBest for
ChainSequentialStops on failureMulti-step pipelines
GroupParallelWaits for allFan-out/fan-in
BatchParallelIndependentBulk operations