Skip to content

gRPC Protocol Binding

The gRPC protocol binding maps OJS logical operations to gRPC service methods using Protocol Buffers (proto3). gRPC is an optional binding. The HTTP binding remains the required baseline that every networked OJS implementation must support. Implementations that support gRPC provide higher performance and native streaming capabilities.

ScenarioRecommended
High-throughput job ingestion (10,000+ jobs/sec)gRPC (binary serialization, HTTP/2 multiplexing)
Real-time job assignment to workersgRPC (StreamJobs eliminates polling)
Real-time monitoring dashboardsgRPC (StreamEvents for lifecycle events)
Polyglot service meshesgRPC (type-safe clients from a single .proto)
Browser-based clientsHTTP (no gRPC-Web proxy needed)
Simple integrations and debuggingHTTP (curl, human-readable JSON)
Webhook-driven architecturesHTTP (standard callback mechanism)

Implementations supporting both protocols should expose them on separate ports (recommended: HTTP on 8080, gRPC on 9090). Both protocols must share the same backend state. A job enqueued via HTTP must be fetchable via gRPC and vice versa.

OperationgRPC RPCRequest/ResponseRPC Kind
PUSHEnqueueEnqueueRequest / EnqueueResponseUnary
PUSH (batch)EnqueueBatchEnqueueBatchRequest / EnqueueBatchResponseUnary
FETCHFetchFetchRequest / FetchResponseUnary
FETCH (streaming)StreamJobsStreamJobsRequest / stream JobServer streaming
ACKAckAckRequest / AckResponseUnary
FAILNackNackRequest / NackResponseUnary
BEATHeartbeatHeartbeatRequest / HeartbeatResponseUnary
CANCELCancelJobCancelJobRequest / CancelJobResponseUnary
INFOGetJobGetJobRequest / GetJobResponseUnary
CategoryRPCConformance Level
SystemManifest, Health0
QueuesListQueues0
QueuesQueueStats, PauseQueue, ResumeQueue4
Dead LetterListDeadLetter, RetryDeadLetter, DeleteDeadLetter1
CronRegisterCron, UnregisterCron, ListCron2
WorkflowsCreateWorkflow, GetWorkflow, CancelWorkflow3
StreamingStreamJobs, StreamEventsOptional (any level)

The full OJSService lives in the ojs.v1 package. Implementations must implement all RPCs corresponding to their declared conformance level. Unsupported methods must return UNIMPLEMENTED rather than being omitted, so clients can detect missing capabilities at runtime.

syntax = "proto3";
package ojs.v1;
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/struct.proto";
service OJSService {
// System
rpc Manifest(ManifestRequest) returns (ManifestResponse);
rpc Health(HealthRequest) returns (HealthResponse);
// Jobs
rpc Enqueue(EnqueueRequest) returns (EnqueueResponse); // Level 0
rpc EnqueueBatch(EnqueueBatchRequest) returns (EnqueueBatchResponse); // Level 4
rpc GetJob(GetJobRequest) returns (GetJobResponse); // Level 1
rpc CancelJob(CancelJobRequest) returns (CancelJobResponse); // Level 1
// Workers
rpc Fetch(FetchRequest) returns (FetchResponse); // Level 0
rpc Ack(AckRequest) returns (AckResponse); // Level 0
rpc Nack(NackRequest) returns (NackResponse); // Level 0
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse); // Level 1
// Streaming (Optional)
rpc StreamJobs(StreamJobsRequest) returns (stream Job);
rpc StreamEvents(StreamEventsRequest) returns (stream Event);
// Queues
rpc ListQueues(ListQueuesRequest) returns (ListQueuesResponse); // Level 0
rpc QueueStats(QueueStatsRequest) returns (QueueStatsResponse); // Level 4
rpc PauseQueue(PauseQueueRequest) returns (PauseQueueResponse); // Level 4
rpc ResumeQueue(ResumeQueueRequest) returns (ResumeQueueResponse); // Level 4
// Dead Letter
rpc ListDeadLetter(ListDeadLetterRequest) returns (ListDeadLetterResponse); // Level 1
rpc RetryDeadLetter(RetryDeadLetterRequest) returns (RetryDeadLetterResponse); // Level 1
rpc DeleteDeadLetter(DeleteDeadLetterRequest) returns (DeleteDeadLetterResponse); // Level 1
// Cron
rpc RegisterCron(RegisterCronRequest) returns (RegisterCronResponse); // Level 2
rpc UnregisterCron(UnregisterCronRequest) returns (UnregisterCronResponse); // Level 2
rpc ListCron(ListCronRequest) returns (ListCronResponse); // Level 2
// Workflows
rpc CreateWorkflow(CreateWorkflowRequest) returns (CreateWorkflowResponse); // Level 3
rpc GetWorkflow(GetWorkflowRequest) returns (GetWorkflowResponse); // Level 3
rpc CancelWorkflow(CancelWorkflowRequest) returns (CancelWorkflowResponse); // Level 3
}

The Job message is the protobuf representation of the OJS job envelope:

message Job {
string id = 1; // UUIDv7
string type = 2; // Dot-namespaced job type
string queue = 3; // Target queue
repeated google.protobuf.Value args = 4; // Positional arguments
google.protobuf.Struct meta = 5; // Extensible metadata
JobState state = 6; // Current lifecycle state
int32 priority = 7;
int32 attempt = 8; // 1-indexed once execution begins
int32 max_attempts = 9;
RetryPolicy retry_policy = 10;
UniquePolicy unique_policy = 11;
google.protobuf.Struct result = 12;
repeated JobError errors = 13;
google.protobuf.Timestamp created_at = 14;
google.protobuf.Timestamp enqueued_at = 15;
google.protobuf.Timestamp scheduled_at = 16;
google.protobuf.Timestamp started_at = 17;
google.protobuf.Timestamp completed_at = 18;
google.protobuf.Timestamp expires_at = 19;
google.protobuf.Duration timeout = 20;
google.protobuf.Duration visibility_timeout = 21;
repeated string tags = 22;
string trace_id = 23;
string workflow_id = 24;
}

Arguments use repeated google.protobuf.Value to maintain JSON-compatible type semantics (string, number, bool, null, list, struct).

enum JobState {
JOB_STATE_UNSPECIFIED = 0;
JOB_STATE_SCHEDULED = 1;
JOB_STATE_AVAILABLE = 2;
JOB_STATE_PENDING = 3;
JOB_STATE_ACTIVE = 4;
JOB_STATE_COMPLETED = 5;
JOB_STATE_RETRYABLE = 6;
JOB_STATE_CANCELLED = 7;
JOB_STATE_DISCARDED = 8;
}
message RetryPolicy {
int32 max_attempts = 1;
google.protobuf.Duration initial_interval = 2;
double backoff_coefficient = 3;
google.protobuf.Duration max_interval = 4;
bool jitter = 5;
repeated string non_retryable_errors = 6;
}
message UniquePolicy {
repeated string key = 1;
google.protobuf.Duration period = 2;
UniqueConflictAction on_conflict = 3;
repeated JobState states = 4;
}
message JobError {
string code = 1;
string message = 2;
bool retryable = 3;
int32 attempt = 4;
google.protobuf.Timestamp occurred_at = 5;
string backtrace = 6;
google.protobuf.Struct details = 7;
}

StreamJobs provides a persistent, server-streaming connection that pushes jobs to a worker as they become available. It is a high-throughput alternative to polling via Fetch.

How it works:

  1. The worker opens a stream specifying queues, a worker_id, and a max_concurrent limit.
  2. The server sends Job messages as jobs become available. Each job transitions to active and the visibility timeout begins.
  3. The worker must acknowledge or fail each job via separate Ack or Nack unary RPCs, not on the stream itself.
  4. The server must not send more unacknowledged jobs than max_concurrent, providing backpressure.
  5. If the stream disconnects, the server reclaims unacknowledged jobs after their visibility timeout expires.

If a queue is paused while a stream is active, the server stops sending jobs from that queue but does not close the stream.

StreamEvents delivers real-time lifecycle events for monitoring and dashboards. Events are delivered on a best-effort basis and may be dropped under backpressure.

Supported event types include:

Event TypeLevelDescription
job.enqueued0Job entered the available state
job.started0Worker began executing
job.completed0Handler succeeded
job.failed0Handler failed (per attempt)
job.dead1Job moved to dead letter queue
job.retrying1Job scheduled for retry
job.cancelled1Job was cancelled
workflow.started3Workflow began
workflow.completed3All workflow steps completed
workflow.failed3A workflow step failed terminally
stream.keepalive0Heartbeat to keep the connection alive

Filters in StreamEventsRequest combine with AND logic: you can filter by queues, event types, a specific job ID, or a specific workflow ID.

gRPC StatusOJS Error Code(s)When Used
OK (0)(success)RPC completed successfully
INVALID_ARGUMENT (3)invalid_payload, invalid_request, schema_validationInvalid request data
NOT_FOUND (5)not_foundJob, queue, workflow, or cron schedule not found
ALREADY_EXISTS (6)duplicateUnique constraint violated (reject)
FAILED_PRECONDITION (9)queue_paused, unsupportedOperation not valid in current state
RESOURCE_EXHAUSTED (8)rate_limitedRate limit exceeded
INTERNAL (13)backend_errorServer or backend failure
UNAVAILABLE (14)(backend down)Backend temporarily unreachable
UNIMPLEMENTED (12)unsupportedRPC not implemented at this conformance level

Error details use google.rpc.ErrorInfo with domain set to "openjobspec.org" and reason containing the OJS error code string.

gRPC metadata (analogous to HTTP headers) carries cross-cutting concerns:

Request metadata (client to server):

KeyDescription
x-ojs-request-idUnique request identifier for correlation
x-ojs-api-keyAPI key for authentication
traceparentW3C Trace Context propagation
tracestateW3C Trace Context state
x-ojs-idempotency-keyIdempotency key for safe retries

Response metadata (server to client):

KeyDescription
x-ojs-request-idEchoed from the request
x-ojs-server-versionOJS implementation version
x-ojs-conformance-levelServer’s conformance level

For rate limiting, the server includes retry-after, x-ojs-ratelimit-limit, x-ojs-ratelimit-remaining, and x-ojs-ratelimit-reset in response metadata.

When an implementation supports both protocols, these invariants must hold:

  • A job enqueued via HTTP must be retrievable via gRPC GetJob, and vice versa.
  • A job enqueued via gRPC must be fetchable by an HTTP worker, and vice versa.
  • Both protocols operate against the same backend state.
  • Events from HTTP operations are visible on gRPC StreamEvents, and vice versa.

Key type mappings between protocols:

ConceptHTTP/JSONgRPC/Protobuf
TimestampsISO 8601 stringgoogle.protobuf.Timestamp
DurationsInteger millisecondsgoogle.protobuf.Duration
ArgumentsJSON arrayrepeated google.protobuf.Value
MetadataJSON objectgoogle.protobuf.Struct
Job stateString enumJobState enum

Authentication for gRPC supports three mechanisms:

  1. Mutual TLS (mTLS) is recommended for production. The server rejects connections without valid client certificates.
  2. API key via the x-ojs-api-key metadata key, transmitted over TLS-encrypted connections only.
  3. Bearer tokens via the authorization metadata key for JWT or OAuth2 flows.

Missing or invalid credentials return UNAUTHENTICATED (status code 16). Insufficient permissions return PERMISSION_DENIED (status code 7).

LevelRequired RPCs
0 (Core)Manifest, Health, Enqueue, Fetch, Ack, Nack, ListQueues
1 (Reliable)Level 0 + GetJob, CancelJob, Heartbeat, ListDeadLetter, RetryDeadLetter, DeleteDeadLetter
2 (Scheduled)Level 1 + RegisterCron, UnregisterCron, ListCron
3 (Orchestration)Level 2 + CreateWorkflow, GetWorkflow, CancelWorkflow
4 (Advanced)Level 3 + EnqueueBatch, QueueStats, PauseQueue, ResumeQueue
OptionalStreamJobs, StreamEvents (available at any level)

Implementations should support gRPC Server Reflection for discovery by tools like grpcurl and should implement the standard gRPC Health Checking Protocol for Kubernetes compatibility.

Health check:

Terminal window
grpcurl -plaintext localhost:9090 ojs.v1.OJSService/Health

Enqueue a job:

Terminal window
grpcurl -plaintext -d '{
"type": "email.send",
"args": [
{"stringValue": "user@example.com"},
{"stringValue": "welcome"}
],
"options": {
"queue": "email",
"retry": {
"maxAttempts": 5,
"initialInterval": "1s",
"backoffCoefficient": 2.0,
"jitter": true
}
}
}' localhost:9090 ojs.v1.OJSService/Enqueue

Fetch and acknowledge:

Terminal window
# Fetch
grpcurl -plaintext -d '{
"queues": ["email", "default"],
"count": 1,
"workerId": "worker-go-01"
}' localhost:9090 ojs.v1.OJSService/Fetch
# Ack
grpcurl -plaintext -d '{
"jobId": "019462a0-b1c2-7def-8abc-123456789012",
"result": {
"fields": {
"message_id": {"stringValue": "msg_abc123"}
}
}
}' localhost:9090 ojs.v1.OJSService/Ack

Stream events (monitoring):

Terminal window
grpcurl -plaintext -d '{
"queues": ["email"],
"eventTypes": ["job.completed", "job.failed"]
}' localhost:9090 ojs.v1.OJSService/StreamEvents

With authentication metadata:

Terminal window
grpcurl -plaintext \
-H 'x-ojs-api-key: sk_live_abc123' \
-H 'x-ojs-request-id: req_019462a0' \
-d '{"type": "email.send", "args": [{"stringValue": "user@example.com"}]}' \
localhost:9090 ojs.v1.OJSService/Enqueue