Skip to content

Tutorial: Your First Job in Ruby

This tutorial walks you through building a background job system with the Ruby SDK. You will enqueue, process, and monitor jobs using Ruby — zero runtime dependencies, just the standard library.

Terminal window
mkdir ojs-ruby-tutorial && cd ojs-ruby-tutorial

Create a Gemfile:

source "https://rubygems.org"
gem "ojs"

Install dependencies:

Terminal window
bundle install

Create enqueue.rb:

enqueue.rb
require "ojs"
client = OJS::Client.new("http://localhost:8080")
# Enqueue a simple job
job = client.enqueue("email.send", to: "user@example.com", template: "welcome")
puts "Enqueued job #{job.id} in state: #{job.state}"
# Enqueue with retry policy
job2 = client.enqueue("report.generate", { id: 42, format: "pdf" },
queue: "reports",
retry: OJS::RetryPolicy.new(max_attempts: 5, backoff_coefficient: 2.0)
)
puts "Enqueued job #{job2.id} with retries"

Run it:

Terminal window
bundle exec ruby enqueue.rb

Output:

Enqueued job 019461a8-1a2b-7c3d-8e4f-5a6b7c8d9e0f in state: available
Enqueued job 019461a8-2b3c-7d4e-9f50-6a7b8c9d0e1f with retries

Create worker.rb:

worker.rb
require "ojs"
worker = OJS::Worker.new("http://localhost:8080",
queues: %w[default reports],
concurrency: 5
)
# Register a handler for "email.send" jobs
worker.register("email.send") do |ctx|
to = ctx.job.args["to"]
template = ctx.job.args["template"]
puts " Sending '#{template}' email to #{to}"
# Your email logic goes here
sleep(0.2)
{ delivered: true }
end
# Register a handler for "report.generate" jobs
worker.register("report.generate") do |ctx|
report_id = ctx.job.args["id"]
format = ctx.job.args["format"]
puts " Generating #{format.upcase} report #{report_id}"
# Simulate report generation
sleep(1.0)
{ url: "https://reports.example.com/#{report_id}.#{format}" }
end
# Graceful shutdown on SIGINT/SIGTERM
%w[INT TERM].each do |signal|
Signal.trap(signal) do
puts "\nShutting down worker..."
worker.stop
end
end
puts "Worker started, waiting for jobs..."
worker.start

Run the worker:

Terminal window
bundle exec ruby worker.rb

Output:

Worker started, waiting for jobs...
Sending 'welcome' email to user@example.com

Create worker_with_middleware.rb:

worker_with_middleware.rb
require "ojs"
require "logger"
logger = Logger.new($stdout)
worker = OJS::Worker.new("http://localhost:8080",
queues: %w[default],
concurrency: 5
)
# Middleware: timing and logging (yield-based pattern)
worker.use("logging") do |ctx, &nxt|
start = Process.clock_gettime(Process::CLOCK_MONOTONIC)
logger.info "[START] #{ctx.job.type} (#{ctx.job.id})"
begin
result = nxt.call
elapsed = ((Process.clock_gettime(Process::CLOCK_MONOTONIC) - start) * 1000).round
logger.info "[DONE] #{ctx.job.type} took #{elapsed}ms"
result
rescue => e
elapsed = ((Process.clock_gettime(Process::CLOCK_MONOTONIC) - start) * 1000).round
logger.error "[FAIL] #{ctx.job.type} after #{elapsed}ms: #{e.message}"
raise
end
end
# Middleware: error enrichment
worker.use("error_context") do |ctx, &nxt|
begin
nxt.call
rescue => e
raise e.class, "job=#{ctx.job.type} id=#{ctx.job.id} attempt=#{ctx.job.attempt}: #{e.message}"
end
end
worker.register("email.send") do |ctx|
to = ctx.job.args["to"]
puts " Sending email to #{to}"
sleep(0.2)
{ delivered: true }
end
%w[INT TERM].each do |signal|
Signal.trap(signal) { worker.stop }
end
puts "Worker started with middleware, waiting for jobs..."
worker.start
batch.rb
require "ojs"
client = OJS::Client.new("http://localhost:8080")
# Batch enqueue multiple jobs
jobs = client.enqueue_batch([
{ type: "email.send", args: { to: "alice@example.com", template: "welcome" } },
{ type: "email.send", args: { to: "bob@example.com", template: "welcome" } },
{ type: "email.send", args: { to: "carol@example.com", template: "welcome" } },
])
puts "Enqueued #{jobs.length} jobs:"
jobs.each { |j| puts " #{j.id}: #{j.state}" }
# Check status of first job
status = client.get_job(jobs.first.id)
puts "\nFirst job state: #{status.state}"

Create workflows with chain (sequential), group (parallel), and batch primitives:

workflows.rb
require "ojs"
client = OJS::Client.new("http://localhost:8080")
# Chain: sequential execution (A → B → C)
chain = client.workflow(OJS.chain(
OJS::Step.new(type: "order.validate", args: { order_id: "ord_123" }),
OJS::Step.new(type: "payment.charge", args: {}),
OJS::Step.new(type: "notification.send", args: {}),
name: "order-processing"
))
puts "Chain workflow: #{chain.id}"
# Group: parallel execution
group = client.workflow(OJS.group(
OJS::Step.new(type: "export.csv", args: { report_id: "rpt_456" }),
OJS::Step.new(type: "export.pdf", args: { report_id: "rpt_456" }),
name: "multi-export"
))
puts "Group workflow: #{group.id}"
# Batch: parallel with callbacks
batch = client.workflow(OJS.batch(
[
OJS::Step.new(type: "email.send", args: { to: "user1@example.com" }),
OJS::Step.new(type: "email.send", args: { to: "user2@example.com" }),
],
name: "bulk-email",
on_complete: OJS::Step.new(type: "batch.report", args: {}),
on_failure: OJS::Step.new(type: "batch.alert", args: {})
))
puts "Batch workflow: #{batch.id}"

The SDK provides structured error classes inheriting from OJS::Error:

error_handling.rb
require "ojs"
client = OJS::Client.new("http://localhost:8080")
begin
client.enqueue("email.send", to: "user@example.com")
rescue OJS::ConflictError => e
puts "Duplicate job: #{e.existing_job_id}"
rescue OJS::RateLimitError => e
sleep(e.retry_after || 5)
retry
rescue OJS::Error => e
puts "#{e.code}: #{e.message} (retryable: #{e.retryable?})"
end
  • A Ruby client that enqueues jobs to an OJS server
  • A Ruby worker with thread pool concurrency and graceful shutdown
  • Retry policies for automatic failure recovery
  • Yield-based middleware for logging and error enrichment
  • Batch operations for efficient bulk enqueuing
  • Workflows with chain, group, and batch orchestration