Implement a Backend
So you want to build an OJS backend. Maybe you need an in-memory implementation for testing, a SQLite backend for embedded use, or a Kafka-backed system for event streaming. This guide walks through everything you need to implement, from the core interfaces to passing the conformance test suite.
What a backend is responsible for
Section titled “What a backend is responsible for”An OJS backend is the storage and coordination layer that sits between clients (producers) and workers (consumers). It has four core responsibilities:
- Job persistence: Store job envelopes durably, including all attributes, args, metadata, and results.
- State management: Enforce the 8-state lifecycle and reject invalid transitions.
- Atomic operations: Guarantee that state transitions are atomic. Two workers fetching from the same queue must never claim the same job.
- Worker coordination: Track active workers via heartbeats, enforce visibility timeouts, and requeue jobs from crashed workers.
The key design principle is server-side intelligence. Retry decisions, scheduling, state transitions, and backoff calculations all happen in your backend. SDKs stay thin.
The Backend interface
Section titled “The Backend interface”Looking at the official Go implementations, here is the core interface your backend needs to satisfy:
type Backend interface { // Core operations (Level 0) Push(ctx context.Context, job *Job) (*Job, error) Fetch(ctx context.Context, queues []string, count int, workerID string, visibilityTimeoutMs int) ([]*Job, error) Ack(ctx context.Context, jobID string, result []byte) (*AckResponse, error) Nack(ctx context.Context, jobID string, jobErr *JobError, requeue bool) (*NackResponse, error) Info(ctx context.Context, jobID string) (*Job, error) Cancel(ctx context.Context, jobID string) (*Job, error) ListQueues(ctx context.Context) ([]QueueInfo, error) Health(ctx context.Context) (*HealthResponse, error)
// Reliability (Level 1) Heartbeat(ctx context.Context, workerID string, activeJobs []string, visibilityTimeoutMs int) (*HeartbeatResponse, error) ListDeadLetter(ctx context.Context, limit, offset int) ([]*Job, int, error) RetryDeadLetter(ctx context.Context, jobID string) (*Job, error) DeleteDeadLetter(ctx context.Context, jobID string) error
// Scheduling (Level 2) RegisterCron(ctx context.Context, cron *CronJob) (*CronJob, error) ListCron(ctx context.Context) ([]*CronJob, error) DeleteCron(ctx context.Context, name string) (*CronJob, error)
// Workflows (Level 3) CreateWorkflow(ctx context.Context, req *WorkflowRequest) (*Workflow, error) GetWorkflow(ctx context.Context, id string) (*Workflow, error) CancelWorkflow(ctx context.Context, id string) (*Workflow, error) AdvanceWorkflow(ctx context.Context, workflowID string, jobID string, result json.RawMessage, failed bool) error
// Advanced (Level 4) PushBatch(ctx context.Context, jobs []*Job) ([]*Job, error) QueueStats(ctx context.Context, name string) (*QueueStats, error) PauseQueue(ctx context.Context, name string) error ResumeQueue(ctx context.Context, name string) error
Close() error}You do not need to implement everything at once. Start with Level 0 and work your way up.
Required HTTP endpoints
Section titled “Required HTTP endpoints”Your backend server needs to expose HTTP endpoints following the OJS HTTP Binding specification. All endpoints live under the /ojs/v1 base path, with one exception: the conformance manifest at /ojs/manifest.
Level 0 (Core) endpoints
Section titled “Level 0 (Core) endpoints”These are the minimum for a conforming implementation:
| Method | Path | Purpose |
|---|---|---|
GET | /ojs/manifest | Conformance manifest (no /v1 prefix) |
GET | /ojs/v1/health | Health check |
POST | /ojs/v1/jobs | Enqueue a job (PUSH) |
POST | /ojs/v1/workers/fetch | Claim jobs for processing (FETCH) |
POST | /ojs/v1/workers/ack | Report success (ACK) |
POST | /ojs/v1/workers/nack | Report failure (FAIL) |
GET | /ojs/v1/queues | List queues |
GET | /ojs/v1/schemas | List schemas |
POST | /ojs/v1/schemas | Register schema |
The content type for all requests and responses is application/openjobspec+json. Your server must also accept application/json as an alias.
The conformance manifest
Section titled “The conformance manifest”Every OJS backend must serve a manifest at GET /ojs/manifest. This tells clients what your backend supports:
{ "specversion": "1.0", "implementation": { "name": "ojs-sqlite", "version": "0.1.0", "language": "go", "repository": "https://github.com/you/ojs-sqlite" }, "conformance_level": 0, "conformance_tier": "runtime", "protocols": ["http"], "backend": "sqlite"}Update conformance_level as you implement more features.
Implementing the state machine
Section titled “Implementing the state machine”The 8-state lifecycle is the heart of OJS. Here are the valid transitions:
| From State | Valid Transitions To ||------------- |-----------------------------------------------------|| scheduled | available, cancelled || available | active, cancelled || pending | available, cancelled || active | completed, retryable, cancelled, discarded || retryable | available, cancelled, discarded || completed | (terminal, no transitions) || cancelled | (terminal, no transitions) || discarded | (terminal, no transitions) |Your implementation needs a function like this:
var validTransitions = map[string][]string{ "available": {"active", "cancelled"}, "scheduled": {"available", "cancelled"}, "pending": {"available", "cancelled"}, "active": {"completed", "retryable", "discarded", "cancelled"}, "retryable": {"available", "cancelled"}, "completed": {}, "cancelled": {}, "discarded": {},}
func IsValidTransition(from, to string) bool { targets, ok := validTransitions[from] if !ok { return false } for _, t := range targets { if t == to { return true } } return false}Every state change must:
- Be atomic. If two workers try to claim the same job, only one succeeds. The other gets an error or an empty response.
- Validate the transition. Reject invalid transitions with a
409 Conflictand aninvalid_state_transitionerror code. - Update timestamps. Set
started_atwhen enteringactive,completed_atwhen enteringcompleted, and so on.
For the Redis backend, atomicity is achieved using Lua scripts. For the PostgreSQL backend, it uses SELECT ... FOR UPDATE SKIP LOCKED. For an in-memory backend, a simple mutex works. Choose the right concurrency primitive for your storage engine.
Job storage requirements
Section titled “Job storage requirements”If the client does not provide an id, your backend must generate a UUIDv7. UUIDv7 values are time-ordered and globally unique, which makes them good for database indexing and natural chronological sorting.
Timestamps
Section titled “Timestamps”All timestamps use RFC 3339 format in UTC (e.g., 2026-02-12T10:30:00.000Z). Your backend must set these system-managed timestamps:
created_atandenqueued_at: Set when the job is first stored.started_at: Set when the job transitions toactive.completed_at: Set when the job transitions tocompleted.failed_at: Set when the job transitions toretryableordiscarded.
Preserving unknown attributes
Section titled “Preserving unknown attributes”Your backend must accept and preserve unknown fields in the job envelope. This is critical for forward compatibility. If a newer SDK sends a field your backend does not recognize, store it and return it unchanged. Do not silently drop unknown fields.
Envelope validation
Section titled “Envelope validation”Validate these constraints on the POST /ojs/v1/jobs endpoint:
typeis required, must match^[a-zA-Z][a-zA-Z0-9_]*(\.[a-zA-Z][a-zA-Z0-9_]*)*$argsis required, must be a JSON array of JSON-native typesqueuemust match^[a-z0-9][a-z0-9\-\.]*$(defaults to"default")priority, if provided, must be within valid range
Return 400 Bad Request with an invalid_payload error code for validation failures.
Worker coordination
Section titled “Worker coordination”Visibility timeout
Section titled “Visibility timeout”When a worker fetches a job, the job enters the active state with a visibility timeout (also called a reservation period). If the worker does not ACK or NACK the job within this period, the backend must automatically return the job to the available state.
This is how OJS provides at-least-once delivery. A crashed worker’s jobs automatically get requeued.
You need a background process (a ticker, a scheduled task, or a database check) that periodically scans for jobs whose visibility timeout has expired and moves them back to available.
Heartbeats
Section titled “Heartbeats”Workers send periodic heartbeats via POST /ojs/v1/workers/heartbeat. Each heartbeat:
- Extends the visibility timeout for the worker’s active jobs.
- Reports the worker’s current state (running, quiet, terminate).
- Receives server-directed state changes in the response.
The heartbeat response can tell a worker to enter quiet mode (stop fetching, finish current jobs) or terminate mode (shut down). This enables zero-downtime deployments.
Crash recovery
Section titled “Crash recovery”If a worker stops sending heartbeats, its jobs’ visibility timeouts will eventually expire. Your background reaper process handles this automatically. No manual intervention is needed.
A minimal Level 0 implementation outline
Section titled “A minimal Level 0 implementation outline”Here is a sketch of what an in-memory backend looks like. This is pseudocode to show the structure, not a complete implementation:
type InMemoryBackend struct { mu sync.RWMutex jobs map[string]*Job // jobID -> Job queues map[string][]*Job // queue name -> ordered jobs}
func (b *InMemoryBackend) Push(ctx context.Context, job *Job) (*Job, error) { b.mu.Lock() defer b.mu.Unlock()
// Validate the envelope if err := validateJob(job); err != nil { return nil, err }
// Assign UUIDv7 if no ID provided if job.ID == "" { job.ID = generateUUIDv7() }
// Set system-managed fields now := time.Now().UTC() job.CreatedAt = now job.EnqueuedAt = now job.SpecVersion = "1.0.0-rc.1"
// Determine initial state if job.ScheduledAt != nil && job.ScheduledAt.After(now) { job.State = "scheduled" } else { job.State = "available" }
// Store and index b.jobs[job.ID] = job b.queues[job.Queue] = append(b.queues[job.Queue], job)
return job, nil}
func (b *InMemoryBackend) Fetch(ctx context.Context, queues []string, count int, workerID string, visibilityTimeoutMs int) ([]*Job, error) { b.mu.Lock() defer b.mu.Unlock()
var claimed []*Job for _, q := range queues { for _, job := range b.queues[q] { if job.State == "available" && len(claimed) < count { // Atomic state transition job.State = "active" job.StartedAt = timePtr(time.Now().UTC()) job.WorkerID = workerID job.VisibilityDeadline = time.Now().Add( time.Duration(visibilityTimeoutMs) * time.Millisecond, ) claimed = append(claimed, job) } } } return claimed, nil}
func (b *InMemoryBackend) Ack(ctx context.Context, jobID string, result []byte) (*AckResponse, error) { b.mu.Lock() defer b.mu.Unlock()
job, ok := b.jobs[jobID] if !ok { return nil, ErrNotFound } if !IsValidTransition(job.State, "completed") { return nil, ErrInvalidTransition }
job.State = "completed" job.CompletedAt = timePtr(time.Now().UTC()) job.Result = result
return &AckResponse{ Acknowledged: true, JobID: jobID, State: "completed", }, nil}Testing with the conformance suite
Section titled “Testing with the conformance suite”The OJS conformance test suite is a collection of language-agnostic JSON test files that verify your implementation behaves correctly. There are five levels, each building on the previous:
Level 0: Core
Section titled “Level 0: Core”The minimum viable job system. Tests cover:
- Envelope validation (required fields, type format, args constraints)
- State machine transitions (valid and invalid)
- PUSH, FETCH, ACK, NACK operations
- FIFO ordering within a queue
- Exclusive claim (no double-delivery)
Level 1: Reliable
Section titled “Level 1: Reliable”At-least-once delivery guarantees. Tests cover:
- Retry with exponential backoff
- Non-retryable error classification
- Dead letter queue (discarded jobs after retry exhaustion)
- Visibility timeout and automatic requeue
- Heartbeat extension
- Worker graceful shutdown signals
Level 2: Scheduled
Section titled “Level 2: Scheduled”Time-based features. Tests cover:
- Delayed job execution (
scheduled_at) - Job expiration (
expires_at) - Cron/periodic job scheduling
- Timezone-aware scheduling
Level 3: Orchestration
Section titled “Level 3: Orchestration”Workflow primitives. Tests cover:
- Chain (sequential execution with data passing)
- Group (parallel fan-out)
- Batch (parallel with on_complete/on_success/on_failure callbacks)
Level 4: Advanced
Section titled “Level 4: Advanced”Production-grade features. Tests cover:
- Priority ordering within queues
- Unique job deduplication
- Batch enqueue (atomic multi-job insertion)
- Queue pause/resume
- Queue statistics
Running the tests
Section titled “Running the tests”Start your backend server, then point the conformance runner at it:
# Run Level 0 tests against your serverojs-conformance-runner --level 0 --target http://localhost:8080
# Run all levels up to Level 2ojs-conformance-runner --level 2 --target http://localhost:8080
# Run a specific testojs-conformance-runner --test L0-001 --target http://localhost:8080The runner produces a JSON report showing which tests passed and failed:
{ "test_suite_version": "1.0.0", "target": "http://localhost:8080", "requested_level": 0, "results": { "total": 25, "passed": 24, "failed": 1 }, "conformant": false, "conformant_level": -1, "failures": [ { "test_id": "L0-003", "name": "reject_invalid_state_transition", "reason": "Expected status 409, got 200" } ]}Fix the failures, re-run, and iterate until you pass.
Recommended implementation order
Section titled “Recommended implementation order”- Health endpoint (
GET /ojs/v1/health) and manifest (GET /ojs/manifest). Get these working first so the conformance runner can connect. - PUSH (
POST /ojs/v1/jobs). Validate envelopes, assign IDs, store jobs. - FETCH (
POST /ojs/v1/workers/fetch). Claim jobs atomically, transition toactive. - ACK/NACK (
POST /ojs/v1/workers/ackand/nack). Complete or fail jobs, enforce state transitions. - INFO and CANCEL (
GETandDELETE /ojs/v1/jobs/:id). - Run Level 0 conformance tests. Fix issues until you pass.
- Add Level 1 features (retry logic, heartbeats, visibility timeout, dead letter queue).
- Continue up the conformance levels as needed for your use case.
You do not need Level 4 to be useful. Plenty of production systems operate at Level 1 or Level 2.
Tips from the reference implementations
Section titled “Tips from the reference implementations”Looking at the official Redis and PostgreSQL backends:
- Separate concerns into layers. Both backends use
internal/api/for HTTP handlers,internal/core/for business logic interfaces, andinternal/redis/orinternal/postgres/for storage. - Make dequeue atomic. Redis uses Lua scripts. PostgreSQL uses
SELECT ... FOR UPDATE SKIP LOCKED. Whatever your storage engine, the fetch operation must be atomic. - Use a scheduler goroutine. Both backends run a background process that moves
scheduledjobs toavailablewhen their time arrives, and requeuesactivejobs whose visibility timeout expired. - Return the full job envelope in responses. When a client calls
POST /ojs/v1/jobs, return the complete job with server-assigned fields (id, state, timestamps). Same for ACK, NACK, and CANCEL responses.