Skip to content

Implement an SDK

OJS SDKs are thin. The server handles retry logic, scheduling, state management, and coordination. Your SDK just needs to wrap the HTTP endpoints, provide a clean API for producers and consumers, and handle graceful shutdown. This guide covers the patterns used across all six official SDKs (Go, TypeScript, Python, Java, Rust, Ruby) so you can build one in your language.

Every OJS SDK has the same structure:

your-ojs-sdk/
client # Producer API: enqueue, cancel, getJob, enqueueBatch
worker # Consumer API: polling, handler dispatch, ack/nack
middleware # Enqueue and execution middleware chains
workflow # chain(), group(), batch() helpers
transport # HTTP client wrapping OJS endpoints
job # Job envelope types/models
errors # Structured error types

The two main entry points are the Client (for producers) and the Worker (for consumers). Both use the same transport layer underneath.

Start here. The transport layer is a thin HTTP client that knows about the OJS base path (/ojs/v1) and content type (application/openjobspec+json).

// Pseudocode for the transport layer
class HttpTransport {
private baseUrl: string;
constructor(config: { url: string; auth?: string; headers?: Record<string, string> }) {
this.baseUrl = config.url + '/ojs/v1';
}
async request<T>(opts: {
method: string;
path: string;
body?: unknown;
}): Promise<{ status: number; body: T }> {
const url = this.baseUrl + opts.path;
const response = await fetch(url, {
method: opts.method,
headers: {
'Content-Type': 'application/openjobspec+json',
'Accept': 'application/openjobspec+json',
},
body: opts.body ? JSON.stringify(opts.body) : undefined,
});
if (!response.ok) {
const error = await response.json();
throw new OJSError(error);
}
return {
status: response.status,
body: await response.json(),
};
}
}

The transport should be injectable for testing. All official SDKs accept a custom transport in their config so users can mock HTTP calls in unit tests.

The client is the simpler of the two main components. It wraps four core operations:

Send a POST /ojs/v1/jobs request. This is the most-used method.

# Python SDK example
async def enqueue(
self,
job_type: str,
args: list[Any] | None = None,
*,
queue: str = "default",
priority: int = 0,
retry: RetryPolicy | None = None,
delay_until: str | None = None,
) -> Job:
body = {
"type": job_type,
"args": args or [],
}
options = {}
if queue != "default":
options["queue"] = queue
if priority:
options["priority"] = priority
if retry:
options["retry"] = retry.to_dict()
if delay_until:
options["delay_until"] = delay_until
if options:
body["options"] = options
response = await self._transport.post("/jobs", body=body)
return Job.from_dict(response["job"])

Key details:

  • args must always be serialized as a JSON array. If your language’s API accepts a single value, wrap it in an array.
  • Options (queue, priority, retry, delay, etc.) go in an options object in the request body.
  • The response contains a job object with server-assigned fields (id, state, timestamps).

Send a POST /ojs/v1/jobs/batch. Accepts an array of job specs and returns an array of created jobs. This is a Level 4 feature, but it is simple to implement:

func (c *Client) EnqueueBatch(ctx context.Context, requests []JobRequest) ([]Job, error) {
wireJobs := make([]map[string]any, len(requests))
for i, r := range requests {
wireJobs[i] = map[string]any{
"type": r.Type,
"args": r.Args,
}
}
var resp struct { Jobs []Job `json:"jobs"` }
err := c.transport.post(ctx, "/jobs/batch", map[string]any{"jobs": wireJobs}, &resp)
return resp.Jobs, err
}

Send a GET /ojs/v1/jobs/:id. Returns the full job envelope.

Send a DELETE /ojs/v1/jobs/:id. Returns the job in cancelled state.

The worker is more involved. It needs to handle polling, handler dispatch, concurrency control, heartbeats, and graceful shutdown.

Workers register handlers by job type. The pattern varies by language:

// Go: method call
worker.Register("email.send", func(ctx ojs.JobContext) error {
to := ctx.Job.Args[0].(string)
return sendEmail(to)
})
# Python: decorator
@worker.register("email.send")
async def handle_email(ctx: ojs.JobContext):
to = ctx.args[0]
await send_email(to)
return {"sent": True}
# Ruby: block
worker.register("email.send") do |ctx|
send_email(ctx.job.args["to"])
{ status: "sent" }
end

Internally, the worker maintains a map[string]handler that maps job types to their handler functions. When a job arrives, the worker looks up the handler by job.type and calls it.

If no handler is registered for a job type, the worker should NACK the job with retryable: false so it does not get retried endlessly.

The worker continuously polls POST /ojs/v1/workers/fetch for available jobs:

private poll(): void {
if (this.state !== 'running') return;
if (this.activeJobs.size >= this.config.concurrency) {
// At capacity. Wait and try again.
this.pollTimer = setTimeout(() => this.poll(), this.config.pollInterval);
return;
}
this.fetchAndProcess()
.then((fetched) => {
// Got jobs? Poll immediately for more. Otherwise, back off.
const delay = fetched > 0 ? 0 : this.config.pollInterval;
this.pollTimer = setTimeout(() => this.poll(), delay);
})
.catch(() => {
// On error, back off before retrying.
this.pollTimer = setTimeout(() => this.poll(), this.config.pollInterval * 2);
});
}

The fetch request looks like this:

{
"queues": ["critical", "default", "low"],
"count": 5,
"worker_id": "worker_abc123",
"visibility_timeout_ms": 30000
}

The server returns an array of jobs (possibly empty). The count parameter lets you fetch multiple jobs per request to reduce round trips.

Use your language’s concurrency primitives to limit how many jobs execute simultaneously:

  • Go: A buffered channel or semaphore (the Go SDK uses a chan struct{} as a semaphore).
  • TypeScript/Node: Track active promises in a Map, check .size before fetching.
  • Python: asyncio.Semaphore.
  • Ruby: A thread pool with Queue for work distribution.
  • Java: An ExecutorService with a fixed thread pool.
  • Rust: tokio::sync::Semaphore.

After the handler completes, the worker sends the result back to the server:

On success (ACK):

POST /ojs/v1/workers/ack
{
"job_id": "019461a8-1a2b-7c3d-8e4f-5a6b7c8d9e0f",
"result": { "delivered": true }
}

On failure (NACK):

POST /ojs/v1/workers/nack
{
"job_id": "019461a8-1a2b-7c3d-8e4f-5a6b7c8d9e0f",
"error": {
"code": "handler_error",
"message": "Connection refused to smtp.example.com:587",
"retryable": true,
"details": { "stack": "..." }
}
}

The server decides what happens next based on the retry policy. Your SDK does not need to track retry counts or calculate backoff. Just report success or failure.

The worker sends periodic heartbeats to POST /ojs/v1/workers/heartbeat:

{
"worker_id": "worker_abc123",
"state": "running",
"active_jobs": 3,
"active_job_ids": ["job-1", "job-2", "job-3"]
}

The heartbeat response can contain a server-directed state change:

{
"state": "quiet",
"server_time": "2026-02-12T10:30:00Z"
}

Your worker must handle two directives:

  • quiet: Stop fetching new jobs, but finish processing active ones.
  • terminate: Stop fetching and begin graceful shutdown.

Run the heartbeat loop in a separate thread/goroutine/task from the poll loop.

When the worker receives a shutdown signal (SIGTERM, SIGINT, or a server terminate directive):

  1. Stop fetching new jobs.
  2. Wait for active jobs to complete (up to a grace period, typically 25 seconds).
  3. If jobs are still running after the grace period, abort them. The server will requeue them via visibility timeout.
// Go example: graceful shutdown
func (w *Worker) Start(ctx context.Context) error {
// ... start heartbeat and fetch loops ...
<-ctx.Done() // Wait for cancellation
w.state.Store(WorkerStateTerminate)
// Wait for active jobs or grace period
graceDone := make(chan struct{})
go func() {
w.waitForActiveJobs()
close(graceDone)
}()
select {
case <-graceDone:
// All jobs finished cleanly
case <-time.After(w.config.gracePeriod):
// Grace period expired
}
return nil
}

OJS has two middleware chains: enqueue middleware (runs before a job is sent to the server) and execution middleware (wraps job handler invocation on the worker).

Both use the next() pattern, like Express, Koa, or Rack middleware.

// Go middleware signature
type MiddlewareFunc func(ctx JobContext, next HandlerFunc) error
// Building the chain: wrap from inside out
func (c *middlewareChain) then(handler HandlerFunc) HandlerFunc {
h := handler
for i := len(c.middleware) - 1; i >= 0; i-- {
mw := c.middleware[i].fn
next := h
h = func(ctx JobContext) error {
return mw(ctx, next)
}
}
return h
}

The chain wraps the handler like an onion. Each middleware calls next() to pass control to the next layer. If a middleware does not call next(), the handler is never reached.

Example usage:

// TypeScript: logging middleware
worker.use(async (ctx, next) => {
console.log(`Starting ${ctx.job.type} attempt ${ctx.attempt}`);
const start = Date.now();
await next();
console.log(`Completed ${ctx.job.type} in ${Date.now() - start}ms`);
});

Same pattern, but runs on the client side before the HTTP request:

# Python: enqueue middleware that adds trace context
@client.enqueue_middleware
async def add_trace_id(request, next):
request.meta = request.meta or {}
request.meta["trace_id"] = generate_trace_id()
return await next(request)

The official SDKs support named middleware with insertion and removal operations. This is useful for frameworks that need to control middleware ordering:

// Add named middleware
w.UseNamed("logging", loggingMiddleware)
w.UseNamed("metrics", metricsMiddleware)
// Insert before another
chain.InsertBefore("metrics", "tracing", tracingMiddleware)
// Remove by name
chain.Remove("logging")

Workflow helpers are pure functions that build workflow definitions. They do not make HTTP calls. The definitions are sent to the server via POST /ojs/v1/workflows.

Sequential execution. The result of step N feeds step N+1.

import { chain } from '@openjobspec/sdk';
const wf = chain(
{ type: 'data.fetch', args: { url: '...' } },
{ type: 'data.transform', args: { format: 'csv' } },
{ type: 'data.load', args: { dest: 'warehouse' } },
);
await client.workflow(wf);

Parallel execution. All jobs run concurrently with no ordering guarantees.

import { group } from '@openjobspec/sdk';
const wf = group(
{ type: 'export.csv', args: { reportId: 'rpt_456' } },
{ type: 'export.pdf', args: { reportId: 'rpt_456' } },
{ type: 'export.xlsx', args: { reportId: 'rpt_456' } },
);

Parallel execution with callbacks that fire based on collective outcome.

import { batch } from '@openjobspec/sdk';
const wf = batch(
[
{ type: 'email.send', args: ['user1@example.com'] },
{ type: 'email.send', args: ['user2@example.com'] },
],
{
on_complete: { type: 'batch.report', args: [] },
on_failure: { type: 'batch.alert', args: [] },
},
);

OJS uses structured error objects, not plain strings. Your SDK should define an error type that carries this information:

interface OJSError {
code: string; // Machine-readable error code
message: string; // Human-readable description
retryable: boolean; // Whether the client should retry
details?: unknown; // Additional context
}

Common error codes from the server:

CodeHTTP StatusMeaning
invalid_payload400Malformed or invalid request body
not_found404Job or resource not found
invalid_state_transition409Attempted an invalid state transition
duplicate409Unique job constraint violated
internal_error500Server-side error

Distinguish between HTTP-level failures (network errors, timeouts) and OJS application errors (the server returned a structured error response). Users need to handle these differently.

Inject a mock transport to test client and worker logic without a real server:

// TypeScript: mock transport for unit tests
const mockTransport: Transport = {
async request(opts) {
if (opts.path === '/jobs' && opts.method === 'POST') {
return {
status: 201,
body: {
job: { id: 'test-id', type: 'email.send', state: 'available', args: ['test'] },
},
};
}
throw new Error(`Unexpected request: ${opts.method} ${opts.path}`);
},
};
const client = new OJSClient({ url: 'http://unused', transport: mockTransport });
const job = await client.enqueue('email.send', ['test']);
assert(job.id === 'test-id');

Run the official OJS backend in Docker and test your SDK against it:

Terminal window
# Start the test server
docker compose up -d
# Run your SDK's integration tests
OJS_URL=http://localhost:8080 npm test

Integration tests should cover the full lifecycle: enqueue a job, fetch it with a worker, process it, and verify the final state.

Verify that middleware executes in the correct order and that next() propagation works:

order = []
@worker.middleware
async def first(ctx, next):
order.append("first-before")
result = await next()
order.append("first-after")
return result
@worker.middleware
async def second(ctx, next):
order.append("second-before")
result = await next()
order.append("second-after")
return result
# After processing a job:
assert order == ["first-before", "second-before", "second-after", "first-after"]

Here is a summary of what you need to implement:

  • Transport layer: HTTP client with base path, content type, error parsing
  • Client: enqueue(), enqueueBatch(), getJob(), cancelJob()
  • Worker: Poll loop, handler registration, ACK/NACK, heartbeats
  • Concurrency control: Limit concurrent job execution
  • Graceful shutdown: Signal handling, grace period, drain active jobs
  • Enqueue middleware: next() chain on the client side
  • Execution middleware: next() chain on the worker side
  • Workflow helpers: chain(), group(), batch()
  • Structured errors: Error types with code, message, retryable
  • Unit tests: Mocked transport covering happy and error paths
  • Integration tests: Full lifecycle against a real OJS server