Skip to content

Implement a Backend

So you want to build an OJS backend. Maybe you need an in-memory implementation for testing, a SQLite backend for embedded use, or a Kafka-backed system for event streaming. This guide walks through everything you need to implement, from the core interfaces to passing the conformance test suite.

An OJS backend is the storage and coordination layer that sits between clients (producers) and workers (consumers). It has four core responsibilities:

  1. Job persistence: Store job envelopes durably, including all attributes, args, metadata, and results.
  2. State management: Enforce the 8-state lifecycle and reject invalid transitions.
  3. Atomic operations: Guarantee that state transitions are atomic. Two workers fetching from the same queue must never claim the same job.
  4. Worker coordination: Track active workers via heartbeats, enforce visibility timeouts, and requeue jobs from crashed workers.

The key design principle is server-side intelligence. Retry decisions, scheduling, state transitions, and backoff calculations all happen in your backend. SDKs stay thin.

Looking at the official Go implementations, here is the core interface your backend needs to satisfy:

type Backend interface {
// Core operations (Level 0)
Push(ctx context.Context, job *Job) (*Job, error)
Fetch(ctx context.Context, queues []string, count int, workerID string, visibilityTimeoutMs int) ([]*Job, error)
Ack(ctx context.Context, jobID string, result []byte) (*AckResponse, error)
Nack(ctx context.Context, jobID string, jobErr *JobError, requeue bool) (*NackResponse, error)
Info(ctx context.Context, jobID string) (*Job, error)
Cancel(ctx context.Context, jobID string) (*Job, error)
ListQueues(ctx context.Context) ([]QueueInfo, error)
Health(ctx context.Context) (*HealthResponse, error)
// Reliability (Level 1)
Heartbeat(ctx context.Context, workerID string, activeJobs []string, visibilityTimeoutMs int) (*HeartbeatResponse, error)
ListDeadLetter(ctx context.Context, limit, offset int) ([]*Job, int, error)
RetryDeadLetter(ctx context.Context, jobID string) (*Job, error)
DeleteDeadLetter(ctx context.Context, jobID string) error
// Scheduling (Level 2)
RegisterCron(ctx context.Context, cron *CronJob) (*CronJob, error)
ListCron(ctx context.Context) ([]*CronJob, error)
DeleteCron(ctx context.Context, name string) (*CronJob, error)
// Workflows (Level 3)
CreateWorkflow(ctx context.Context, req *WorkflowRequest) (*Workflow, error)
GetWorkflow(ctx context.Context, id string) (*Workflow, error)
CancelWorkflow(ctx context.Context, id string) (*Workflow, error)
AdvanceWorkflow(ctx context.Context, workflowID string, jobID string, result json.RawMessage, failed bool) error
// Advanced (Level 4)
PushBatch(ctx context.Context, jobs []*Job) ([]*Job, error)
QueueStats(ctx context.Context, name string) (*QueueStats, error)
PauseQueue(ctx context.Context, name string) error
ResumeQueue(ctx context.Context, name string) error
Close() error
}

You do not need to implement everything at once. Start with Level 0 and work your way up.

Your backend server needs to expose HTTP endpoints following the OJS HTTP Binding specification. All endpoints live under the /ojs/v1 base path, with one exception: the conformance manifest at /ojs/manifest.

These are the minimum for a conforming implementation:

MethodPathPurpose
GET/ojs/manifestConformance manifest (no /v1 prefix)
GET/ojs/v1/healthHealth check
POST/ojs/v1/jobsEnqueue a job (PUSH)
POST/ojs/v1/workers/fetchClaim jobs for processing (FETCH)
POST/ojs/v1/workers/ackReport success (ACK)
POST/ojs/v1/workers/nackReport failure (FAIL)
GET/ojs/v1/queuesList queues
GET/ojs/v1/schemasList schemas
POST/ojs/v1/schemasRegister schema

The content type for all requests and responses is application/openjobspec+json. Your server must also accept application/json as an alias.

Every OJS backend must serve a manifest at GET /ojs/manifest. This tells clients what your backend supports:

{
"specversion": "1.0",
"implementation": {
"name": "ojs-sqlite",
"version": "0.1.0",
"language": "go",
"repository": "https://github.com/you/ojs-sqlite"
},
"conformance_level": 0,
"conformance_tier": "runtime",
"protocols": ["http"],
"backend": "sqlite"
}

Update conformance_level as you implement more features.

The 8-state lifecycle is the heart of OJS. Here are the valid transitions:

| From State | Valid Transitions To |
|------------- |-----------------------------------------------------|
| scheduled | available, cancelled |
| available | active, cancelled |
| pending | available, cancelled |
| active | completed, retryable, cancelled, discarded |
| retryable | available, cancelled, discarded |
| completed | (terminal, no transitions) |
| cancelled | (terminal, no transitions) |
| discarded | (terminal, no transitions) |

Your implementation needs a function like this:

var validTransitions = map[string][]string{
"available": {"active", "cancelled"},
"scheduled": {"available", "cancelled"},
"pending": {"available", "cancelled"},
"active": {"completed", "retryable", "discarded", "cancelled"},
"retryable": {"available", "cancelled"},
"completed": {},
"cancelled": {},
"discarded": {},
}
func IsValidTransition(from, to string) bool {
targets, ok := validTransitions[from]
if !ok {
return false
}
for _, t := range targets {
if t == to {
return true
}
}
return false
}

Every state change must:

  1. Be atomic. If two workers try to claim the same job, only one succeeds. The other gets an error or an empty response.
  2. Validate the transition. Reject invalid transitions with a 409 Conflict and an invalid_state_transition error code.
  3. Update timestamps. Set started_at when entering active, completed_at when entering completed, and so on.

For the Redis backend, atomicity is achieved using Lua scripts. For the PostgreSQL backend, it uses SELECT ... FOR UPDATE SKIP LOCKED. For an in-memory backend, a simple mutex works. Choose the right concurrency primitive for your storage engine.

If the client does not provide an id, your backend must generate a UUIDv7. UUIDv7 values are time-ordered and globally unique, which makes them good for database indexing and natural chronological sorting.

All timestamps use RFC 3339 format in UTC (e.g., 2026-02-12T10:30:00.000Z). Your backend must set these system-managed timestamps:

  • created_at and enqueued_at: Set when the job is first stored.
  • started_at: Set when the job transitions to active.
  • completed_at: Set when the job transitions to completed.
  • failed_at: Set when the job transitions to retryable or discarded.

Your backend must accept and preserve unknown fields in the job envelope. This is critical for forward compatibility. If a newer SDK sends a field your backend does not recognize, store it and return it unchanged. Do not silently drop unknown fields.

Validate these constraints on the POST /ojs/v1/jobs endpoint:

  • type is required, must match ^[a-zA-Z][a-zA-Z0-9_]*(\.[a-zA-Z][a-zA-Z0-9_]*)*$
  • args is required, must be a JSON array of JSON-native types
  • queue must match ^[a-z0-9][a-z0-9\-\.]*$ (defaults to "default")
  • priority, if provided, must be within valid range

Return 400 Bad Request with an invalid_payload error code for validation failures.

When a worker fetches a job, the job enters the active state with a visibility timeout (also called a reservation period). If the worker does not ACK or NACK the job within this period, the backend must automatically return the job to the available state.

This is how OJS provides at-least-once delivery. A crashed worker’s jobs automatically get requeued.

You need a background process (a ticker, a scheduled task, or a database check) that periodically scans for jobs whose visibility timeout has expired and moves them back to available.

Workers send periodic heartbeats via POST /ojs/v1/workers/heartbeat. Each heartbeat:

  1. Extends the visibility timeout for the worker’s active jobs.
  2. Reports the worker’s current state (running, quiet, terminate).
  3. Receives server-directed state changes in the response.

The heartbeat response can tell a worker to enter quiet mode (stop fetching, finish current jobs) or terminate mode (shut down). This enables zero-downtime deployments.

If a worker stops sending heartbeats, its jobs’ visibility timeouts will eventually expire. Your background reaper process handles this automatically. No manual intervention is needed.

Here is a sketch of what an in-memory backend looks like. This is pseudocode to show the structure, not a complete implementation:

type InMemoryBackend struct {
mu sync.RWMutex
jobs map[string]*Job // jobID -> Job
queues map[string][]*Job // queue name -> ordered jobs
}
func (b *InMemoryBackend) Push(ctx context.Context, job *Job) (*Job, error) {
b.mu.Lock()
defer b.mu.Unlock()
// Validate the envelope
if err := validateJob(job); err != nil {
return nil, err
}
// Assign UUIDv7 if no ID provided
if job.ID == "" {
job.ID = generateUUIDv7()
}
// Set system-managed fields
now := time.Now().UTC()
job.CreatedAt = now
job.EnqueuedAt = now
job.SpecVersion = "1.0.0-rc.1"
// Determine initial state
if job.ScheduledAt != nil && job.ScheduledAt.After(now) {
job.State = "scheduled"
} else {
job.State = "available"
}
// Store and index
b.jobs[job.ID] = job
b.queues[job.Queue] = append(b.queues[job.Queue], job)
return job, nil
}
func (b *InMemoryBackend) Fetch(ctx context.Context, queues []string, count int, workerID string, visibilityTimeoutMs int) ([]*Job, error) {
b.mu.Lock()
defer b.mu.Unlock()
var claimed []*Job
for _, q := range queues {
for _, job := range b.queues[q] {
if job.State == "available" && len(claimed) < count {
// Atomic state transition
job.State = "active"
job.StartedAt = timePtr(time.Now().UTC())
job.WorkerID = workerID
job.VisibilityDeadline = time.Now().Add(
time.Duration(visibilityTimeoutMs) * time.Millisecond,
)
claimed = append(claimed, job)
}
}
}
return claimed, nil
}
func (b *InMemoryBackend) Ack(ctx context.Context, jobID string, result []byte) (*AckResponse, error) {
b.mu.Lock()
defer b.mu.Unlock()
job, ok := b.jobs[jobID]
if !ok {
return nil, ErrNotFound
}
if !IsValidTransition(job.State, "completed") {
return nil, ErrInvalidTransition
}
job.State = "completed"
job.CompletedAt = timePtr(time.Now().UTC())
job.Result = result
return &AckResponse{
Acknowledged: true,
JobID: jobID,
State: "completed",
}, nil
}

The OJS conformance test suite is a collection of language-agnostic JSON test files that verify your implementation behaves correctly. There are five levels, each building on the previous:

The minimum viable job system. Tests cover:

  • Envelope validation (required fields, type format, args constraints)
  • State machine transitions (valid and invalid)
  • PUSH, FETCH, ACK, NACK operations
  • FIFO ordering within a queue
  • Exclusive claim (no double-delivery)

At-least-once delivery guarantees. Tests cover:

  • Retry with exponential backoff
  • Non-retryable error classification
  • Dead letter queue (discarded jobs after retry exhaustion)
  • Visibility timeout and automatic requeue
  • Heartbeat extension
  • Worker graceful shutdown signals

Time-based features. Tests cover:

  • Delayed job execution (scheduled_at)
  • Job expiration (expires_at)
  • Cron/periodic job scheduling
  • Timezone-aware scheduling

Workflow primitives. Tests cover:

  • Chain (sequential execution with data passing)
  • Group (parallel fan-out)
  • Batch (parallel with on_complete/on_success/on_failure callbacks)

Production-grade features. Tests cover:

  • Priority ordering within queues
  • Unique job deduplication
  • Batch enqueue (atomic multi-job insertion)
  • Queue pause/resume
  • Queue statistics

Start your backend server, then point the conformance runner at it:

Terminal window
# Run Level 0 tests against your server
ojs-conformance-runner --level 0 --target http://localhost:8080
# Run all levels up to Level 2
ojs-conformance-runner --level 2 --target http://localhost:8080
# Run a specific test
ojs-conformance-runner --test L0-001 --target http://localhost:8080

The runner produces a JSON report showing which tests passed and failed:

{
"test_suite_version": "1.0.0",
"target": "http://localhost:8080",
"requested_level": 0,
"results": {
"total": 25,
"passed": 24,
"failed": 1
},
"conformant": false,
"conformant_level": -1,
"failures": [
{
"test_id": "L0-003",
"name": "reject_invalid_state_transition",
"reason": "Expected status 409, got 200"
}
]
}

Fix the failures, re-run, and iterate until you pass.

  1. Health endpoint (GET /ojs/v1/health) and manifest (GET /ojs/manifest). Get these working first so the conformance runner can connect.
  2. PUSH (POST /ojs/v1/jobs). Validate envelopes, assign IDs, store jobs.
  3. FETCH (POST /ojs/v1/workers/fetch). Claim jobs atomically, transition to active.
  4. ACK/NACK (POST /ojs/v1/workers/ack and /nack). Complete or fail jobs, enforce state transitions.
  5. INFO and CANCEL (GET and DELETE /ojs/v1/jobs/:id).
  6. Run Level 0 conformance tests. Fix issues until you pass.
  7. Add Level 1 features (retry logic, heartbeats, visibility timeout, dead letter queue).
  8. Continue up the conformance levels as needed for your use case.

You do not need Level 4 to be useful. Plenty of production systems operate at Level 1 or Level 2.

Looking at the official Redis and PostgreSQL backends:

  • Separate concerns into layers. Both backends use internal/api/ for HTTP handlers, internal/core/ for business logic interfaces, and internal/redis/ or internal/postgres/ for storage.
  • Make dequeue atomic. Redis uses Lua scripts. PostgreSQL uses SELECT ... FOR UPDATE SKIP LOCKED. Whatever your storage engine, the fetch operation must be atomic.
  • Use a scheduler goroutine. Both backends run a background process that moves scheduled jobs to available when their time arrives, and requeues active jobs whose visibility timeout expired.
  • Return the full job envelope in responses. When a client calls POST /ojs/v1/jobs, return the complete job with server-assigned fields (id, state, timestamps). Same for ACK, NACK, and CANCEL responses.