gRPC Protocol Binding
The gRPC protocol binding maps OJS logical operations to gRPC service methods using Protocol Buffers (proto3). gRPC is an optional binding. The HTTP binding remains the required baseline that every networked OJS implementation must support. Implementations that support gRPC provide higher performance and native streaming capabilities.
When to Use gRPC vs HTTP
Section titled “When to Use gRPC vs HTTP”| Scenario | Recommended |
|---|---|
| High-throughput job ingestion (10,000+ jobs/sec) | gRPC (binary serialization, HTTP/2 multiplexing) |
| Real-time job assignment to workers | gRPC (StreamJobs eliminates polling) |
| Real-time monitoring dashboards | gRPC (StreamEvents for lifecycle events) |
| Polyglot service meshes | gRPC (type-safe clients from a single .proto) |
| Browser-based clients | HTTP (no gRPC-Web proxy needed) |
| Simple integrations and debugging | HTTP (curl, human-readable JSON) |
| Webhook-driven architectures | HTTP (standard callback mechanism) |
Implementations supporting both protocols should expose them on separate ports (recommended: HTTP on 8080, gRPC on 9090). Both protocols must share the same backend state. A job enqueued via HTTP must be fetchable via gRPC and vice versa.
Logical Operation Mapping
Section titled “Logical Operation Mapping”| Operation | gRPC RPC | Request/Response | RPC Kind |
|---|---|---|---|
| PUSH | Enqueue | EnqueueRequest / EnqueueResponse | Unary |
| PUSH (batch) | EnqueueBatch | EnqueueBatchRequest / EnqueueBatchResponse | Unary |
| FETCH | Fetch | FetchRequest / FetchResponse | Unary |
| FETCH (streaming) | StreamJobs | StreamJobsRequest / stream Job | Server streaming |
| ACK | Ack | AckRequest / AckResponse | Unary |
| FAIL | Nack | NackRequest / NackResponse | Unary |
| BEAT | Heartbeat | HeartbeatRequest / HeartbeatResponse | Unary |
| CANCEL | CancelJob | CancelJobRequest / CancelJobResponse | Unary |
| INFO | GetJob | GetJobRequest / GetJobResponse | Unary |
Additional RPCs
Section titled “Additional RPCs”| Category | RPC | Conformance Level |
|---|---|---|
| System | Manifest, Health | 0 |
| Queues | ListQueues | 0 |
| Queues | QueueStats, PauseQueue, ResumeQueue | 4 |
| Dead Letter | ListDeadLetter, RetryDeadLetter, DeleteDeadLetter | 1 |
| Cron | RegisterCron, UnregisterCron, ListCron | 2 |
| Workflows | CreateWorkflow, GetWorkflow, CancelWorkflow | 3 |
| Streaming | StreamJobs, StreamEvents | Optional (any level) |
Protobuf Service Definition
Section titled “Protobuf Service Definition”The full OJSService lives in the ojs.v1 package. Implementations must implement all RPCs corresponding to their declared conformance level. Unsupported methods must return UNIMPLEMENTED rather than being omitted, so clients can detect missing capabilities at runtime.
syntax = "proto3";package ojs.v1;
import "google/protobuf/timestamp.proto";import "google/protobuf/duration.proto";import "google/protobuf/struct.proto";
service OJSService { // System rpc Manifest(ManifestRequest) returns (ManifestResponse); rpc Health(HealthRequest) returns (HealthResponse);
// Jobs rpc Enqueue(EnqueueRequest) returns (EnqueueResponse); // Level 0 rpc EnqueueBatch(EnqueueBatchRequest) returns (EnqueueBatchResponse); // Level 4 rpc GetJob(GetJobRequest) returns (GetJobResponse); // Level 1 rpc CancelJob(CancelJobRequest) returns (CancelJobResponse); // Level 1
// Workers rpc Fetch(FetchRequest) returns (FetchResponse); // Level 0 rpc Ack(AckRequest) returns (AckResponse); // Level 0 rpc Nack(NackRequest) returns (NackResponse); // Level 0 rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse); // Level 1
// Streaming (Optional) rpc StreamJobs(StreamJobsRequest) returns (stream Job); rpc StreamEvents(StreamEventsRequest) returns (stream Event);
// Queues rpc ListQueues(ListQueuesRequest) returns (ListQueuesResponse); // Level 0 rpc QueueStats(QueueStatsRequest) returns (QueueStatsResponse); // Level 4 rpc PauseQueue(PauseQueueRequest) returns (PauseQueueResponse); // Level 4 rpc ResumeQueue(ResumeQueueRequest) returns (ResumeQueueResponse); // Level 4
// Dead Letter rpc ListDeadLetter(ListDeadLetterRequest) returns (ListDeadLetterResponse); // Level 1 rpc RetryDeadLetter(RetryDeadLetterRequest) returns (RetryDeadLetterResponse); // Level 1 rpc DeleteDeadLetter(DeleteDeadLetterRequest) returns (DeleteDeadLetterResponse); // Level 1
// Cron rpc RegisterCron(RegisterCronRequest) returns (RegisterCronResponse); // Level 2 rpc UnregisterCron(UnregisterCronRequest) returns (UnregisterCronResponse); // Level 2 rpc ListCron(ListCronRequest) returns (ListCronResponse); // Level 2
// Workflows rpc CreateWorkflow(CreateWorkflowRequest) returns (CreateWorkflowResponse); // Level 3 rpc GetWorkflow(GetWorkflowRequest) returns (GetWorkflowResponse); // Level 3 rpc CancelWorkflow(CancelWorkflowRequest) returns (CancelWorkflowResponse); // Level 3}Core Message Types
Section titled “Core Message Types”The Job message is the protobuf representation of the OJS job envelope:
message Job { string id = 1; // UUIDv7 string type = 2; // Dot-namespaced job type string queue = 3; // Target queue repeated google.protobuf.Value args = 4; // Positional arguments google.protobuf.Struct meta = 5; // Extensible metadata JobState state = 6; // Current lifecycle state int32 priority = 7; int32 attempt = 8; // 1-indexed once execution begins int32 max_attempts = 9; RetryPolicy retry_policy = 10; UniquePolicy unique_policy = 11; google.protobuf.Struct result = 12; repeated JobError errors = 13; google.protobuf.Timestamp created_at = 14; google.protobuf.Timestamp enqueued_at = 15; google.protobuf.Timestamp scheduled_at = 16; google.protobuf.Timestamp started_at = 17; google.protobuf.Timestamp completed_at = 18; google.protobuf.Timestamp expires_at = 19; google.protobuf.Duration timeout = 20; google.protobuf.Duration visibility_timeout = 21; repeated string tags = 22; string trace_id = 23; string workflow_id = 24;}Arguments use repeated google.protobuf.Value to maintain JSON-compatible type semantics (string, number, bool, null, list, struct).
JobState
Section titled “JobState”enum JobState { JOB_STATE_UNSPECIFIED = 0; JOB_STATE_SCHEDULED = 1; JOB_STATE_AVAILABLE = 2; JOB_STATE_PENDING = 3; JOB_STATE_ACTIVE = 4; JOB_STATE_COMPLETED = 5; JOB_STATE_RETRYABLE = 6; JOB_STATE_CANCELLED = 7; JOB_STATE_DISCARDED = 8;}RetryPolicy and UniquePolicy
Section titled “RetryPolicy and UniquePolicy”message RetryPolicy { int32 max_attempts = 1; google.protobuf.Duration initial_interval = 2; double backoff_coefficient = 3; google.protobuf.Duration max_interval = 4; bool jitter = 5; repeated string non_retryable_errors = 6;}
message UniquePolicy { repeated string key = 1; google.protobuf.Duration period = 2; UniqueConflictAction on_conflict = 3; repeated JobState states = 4;}JobError
Section titled “JobError”message JobError { string code = 1; string message = 2; bool retryable = 3; int32 attempt = 4; google.protobuf.Timestamp occurred_at = 5; string backtrace = 6; google.protobuf.Struct details = 7;}Streaming
Section titled “Streaming”StreamJobs
Section titled “StreamJobs”StreamJobs provides a persistent, server-streaming connection that pushes jobs to a worker as they become available. It is a high-throughput alternative to polling via Fetch.
How it works:
- The worker opens a stream specifying queues, a
worker_id, and amax_concurrentlimit. - The server sends
Jobmessages as jobs become available. Each job transitions toactiveand the visibility timeout begins. - The worker must acknowledge or fail each job via separate
AckorNackunary RPCs, not on the stream itself. - The server must not send more unacknowledged jobs than
max_concurrent, providing backpressure. - If the stream disconnects, the server reclaims unacknowledged jobs after their visibility timeout expires.
If a queue is paused while a stream is active, the server stops sending jobs from that queue but does not close the stream.
StreamEvents
Section titled “StreamEvents”StreamEvents delivers real-time lifecycle events for monitoring and dashboards. Events are delivered on a best-effort basis and may be dropped under backpressure.
Supported event types include:
| Event Type | Level | Description |
|---|---|---|
job.enqueued | 0 | Job entered the available state |
job.started | 0 | Worker began executing |
job.completed | 0 | Handler succeeded |
job.failed | 0 | Handler failed (per attempt) |
job.dead | 1 | Job moved to dead letter queue |
job.retrying | 1 | Job scheduled for retry |
job.cancelled | 1 | Job was cancelled |
workflow.started | 3 | Workflow began |
workflow.completed | 3 | All workflow steps completed |
workflow.failed | 3 | A workflow step failed terminally |
stream.keepalive | 0 | Heartbeat to keep the connection alive |
Filters in StreamEventsRequest combine with AND logic: you can filter by queues, event types, a specific job ID, or a specific workflow ID.
gRPC Status Code Mapping
Section titled “gRPC Status Code Mapping”| gRPC Status | OJS Error Code(s) | When Used |
|---|---|---|
OK (0) | (success) | RPC completed successfully |
INVALID_ARGUMENT (3) | invalid_payload, invalid_request, schema_validation | Invalid request data |
NOT_FOUND (5) | not_found | Job, queue, workflow, or cron schedule not found |
ALREADY_EXISTS (6) | duplicate | Unique constraint violated (reject) |
FAILED_PRECONDITION (9) | queue_paused, unsupported | Operation not valid in current state |
RESOURCE_EXHAUSTED (8) | rate_limited | Rate limit exceeded |
INTERNAL (13) | backend_error | Server or backend failure |
UNAVAILABLE (14) | (backend down) | Backend temporarily unreachable |
UNIMPLEMENTED (12) | unsupported | RPC not implemented at this conformance level |
Error details use google.rpc.ErrorInfo with domain set to "openjobspec.org" and reason containing the OJS error code string.
Metadata Propagation
Section titled “Metadata Propagation”gRPC metadata (analogous to HTTP headers) carries cross-cutting concerns:
Request metadata (client to server):
| Key | Description |
|---|---|
x-ojs-request-id | Unique request identifier for correlation |
x-ojs-api-key | API key for authentication |
traceparent | W3C Trace Context propagation |
tracestate | W3C Trace Context state |
x-ojs-idempotency-key | Idempotency key for safe retries |
Response metadata (server to client):
| Key | Description |
|---|---|
x-ojs-request-id | Echoed from the request |
x-ojs-server-version | OJS implementation version |
x-ojs-conformance-level | Server’s conformance level |
For rate limiting, the server includes retry-after, x-ojs-ratelimit-limit, x-ojs-ratelimit-remaining, and x-ojs-ratelimit-reset in response metadata.
Interoperability with HTTP
Section titled “Interoperability with HTTP”When an implementation supports both protocols, these invariants must hold:
- A job enqueued via HTTP must be retrievable via gRPC
GetJob, and vice versa. - A job enqueued via gRPC must be fetchable by an HTTP worker, and vice versa.
- Both protocols operate against the same backend state.
- Events from HTTP operations are visible on gRPC
StreamEvents, and vice versa.
Key type mappings between protocols:
| Concept | HTTP/JSON | gRPC/Protobuf |
|---|---|---|
| Timestamps | ISO 8601 string | google.protobuf.Timestamp |
| Durations | Integer milliseconds | google.protobuf.Duration |
| Arguments | JSON array | repeated google.protobuf.Value |
| Metadata | JSON object | google.protobuf.Struct |
| Job state | String enum | JobState enum |
Authentication
Section titled “Authentication”Authentication for gRPC supports three mechanisms:
- Mutual TLS (mTLS) is recommended for production. The server rejects connections without valid client certificates.
- API key via the
x-ojs-api-keymetadata key, transmitted over TLS-encrypted connections only. - Bearer tokens via the
authorizationmetadata key for JWT or OAuth2 flows.
Missing or invalid credentials return UNAUTHENTICATED (status code 16). Insufficient permissions return PERMISSION_DENIED (status code 7).
Conformance Levels
Section titled “Conformance Levels”| Level | Required RPCs |
|---|---|
| 0 (Core) | Manifest, Health, Enqueue, Fetch, Ack, Nack, ListQueues |
| 1 (Reliable) | Level 0 + GetJob, CancelJob, Heartbeat, ListDeadLetter, RetryDeadLetter, DeleteDeadLetter |
| 2 (Scheduled) | Level 1 + RegisterCron, UnregisterCron, ListCron |
| 3 (Orchestration) | Level 2 + CreateWorkflow, GetWorkflow, CancelWorkflow |
| 4 (Advanced) | Level 3 + EnqueueBatch, QueueStats, PauseQueue, ResumeQueue |
| Optional | StreamJobs, StreamEvents (available at any level) |
Implementations should support gRPC Server Reflection for discovery by tools like grpcurl and should implement the standard gRPC Health Checking Protocol for Kubernetes compatibility.
grpcurl Examples
Section titled “grpcurl Examples”Health check:
grpcurl -plaintext localhost:9090 ojs.v1.OJSService/HealthEnqueue a job:
grpcurl -plaintext -d '{ "type": "email.send", "args": [ {"stringValue": "user@example.com"}, {"stringValue": "welcome"} ], "options": { "queue": "email", "retry": { "maxAttempts": 5, "initialInterval": "1s", "backoffCoefficient": 2.0, "jitter": true } }}' localhost:9090 ojs.v1.OJSService/EnqueueFetch and acknowledge:
# Fetchgrpcurl -plaintext -d '{ "queues": ["email", "default"], "count": 1, "workerId": "worker-go-01"}' localhost:9090 ojs.v1.OJSService/Fetch
# Ackgrpcurl -plaintext -d '{ "jobId": "019462a0-b1c2-7def-8abc-123456789012", "result": { "fields": { "message_id": {"stringValue": "msg_abc123"} } }}' localhost:9090 ojs.v1.OJSService/AckStream events (monitoring):
grpcurl -plaintext -d '{ "queues": ["email"], "eventTypes": ["job.completed", "job.failed"]}' localhost:9090 ojs.v1.OJSService/StreamEventsWith authentication metadata:
grpcurl -plaintext \ -H 'x-ojs-api-key: sk_live_abc123' \ -H 'x-ojs-request-id: req_019462a0' \ -d '{"type": "email.send", "args": [{"stringValue": "user@example.com"}]}' \ localhost:9090 ojs.v1.OJSService/Enqueue