Implement an SDK
OJS SDKs are thin. The server handles retry logic, scheduling, state management, and coordination. Your SDK just needs to wrap the HTTP endpoints, provide a clean API for producers and consumers, and handle graceful shutdown. This guide covers the patterns used across all six official SDKs (Go, TypeScript, Python, Java, Rust, Ruby) so you can build one in your language.
Architecture overview
Section titled “Architecture overview”Every OJS SDK has the same structure:
your-ojs-sdk/ client # Producer API: enqueue, cancel, getJob, enqueueBatch worker # Consumer API: polling, handler dispatch, ack/nack middleware # Enqueue and execution middleware chains workflow # chain(), group(), batch() helpers transport # HTTP client wrapping OJS endpoints job # Job envelope types/models errors # Structured error typesThe two main entry points are the Client (for producers) and the Worker (for consumers). Both use the same transport layer underneath.
The transport layer
Section titled “The transport layer”Start here. The transport layer is a thin HTTP client that knows about the OJS base path (/ojs/v1) and content type (application/openjobspec+json).
// Pseudocode for the transport layerclass HttpTransport { private baseUrl: string;
constructor(config: { url: string; auth?: string; headers?: Record<string, string> }) { this.baseUrl = config.url + '/ojs/v1'; }
async request<T>(opts: { method: string; path: string; body?: unknown; }): Promise<{ status: number; body: T }> { const url = this.baseUrl + opts.path; const response = await fetch(url, { method: opts.method, headers: { 'Content-Type': 'application/openjobspec+json', 'Accept': 'application/openjobspec+json', }, body: opts.body ? JSON.stringify(opts.body) : undefined, });
if (!response.ok) { const error = await response.json(); throw new OJSError(error); }
return { status: response.status, body: await response.json(), }; }}The transport should be injectable for testing. All official SDKs accept a custom transport in their config so users can mock HTTP calls in unit tests.
Client API (producer side)
Section titled “Client API (producer side)”The client is the simpler of the two main components. It wraps four core operations:
enqueue(type, args, options?)
Section titled “enqueue(type, args, options?)”Send a POST /ojs/v1/jobs request. This is the most-used method.
# Python SDK exampleasync def enqueue( self, job_type: str, args: list[Any] | None = None, *, queue: str = "default", priority: int = 0, retry: RetryPolicy | None = None, delay_until: str | None = None,) -> Job: body = { "type": job_type, "args": args or [], } options = {} if queue != "default": options["queue"] = queue if priority: options["priority"] = priority if retry: options["retry"] = retry.to_dict() if delay_until: options["delay_until"] = delay_until if options: body["options"] = options
response = await self._transport.post("/jobs", body=body) return Job.from_dict(response["job"])Key details:
argsmust always be serialized as a JSON array. If your language’s API accepts a single value, wrap it in an array.- Options (queue, priority, retry, delay, etc.) go in an
optionsobject in the request body. - The response contains a
jobobject with server-assigned fields (id, state, timestamps).
enqueueBatch(jobs)
Section titled “enqueueBatch(jobs)”Send a POST /ojs/v1/jobs/batch. Accepts an array of job specs and returns an array of created jobs. This is a Level 4 feature, but it is simple to implement:
func (c *Client) EnqueueBatch(ctx context.Context, requests []JobRequest) ([]Job, error) { wireJobs := make([]map[string]any, len(requests)) for i, r := range requests { wireJobs[i] = map[string]any{ "type": r.Type, "args": r.Args, } } var resp struct { Jobs []Job `json:"jobs"` } err := c.transport.post(ctx, "/jobs/batch", map[string]any{"jobs": wireJobs}, &resp) return resp.Jobs, err}getJob(id)
Section titled “getJob(id)”Send a GET /ojs/v1/jobs/:id. Returns the full job envelope.
cancelJob(id)
Section titled “cancelJob(id)”Send a DELETE /ojs/v1/jobs/:id. Returns the job in cancelled state.
Worker API (consumer side)
Section titled “Worker API (consumer side)”The worker is more involved. It needs to handle polling, handler dispatch, concurrency control, heartbeats, and graceful shutdown.
Handler registration
Section titled “Handler registration”Workers register handlers by job type. The pattern varies by language:
// Go: method callworker.Register("email.send", func(ctx ojs.JobContext) error { to := ctx.Job.Args[0].(string) return sendEmail(to)})# Python: decorator@worker.register("email.send")async def handle_email(ctx: ojs.JobContext): to = ctx.args[0] await send_email(to) return {"sent": True}# Ruby: blockworker.register("email.send") do |ctx| send_email(ctx.job.args["to"]) { status: "sent" }endInternally, the worker maintains a map[string]handler that maps job types to their handler functions. When a job arrives, the worker looks up the handler by job.type and calls it.
If no handler is registered for a job type, the worker should NACK the job with retryable: false so it does not get retried endlessly.
The polling loop
Section titled “The polling loop”The worker continuously polls POST /ojs/v1/workers/fetch for available jobs:
private poll(): void { if (this.state !== 'running') return; if (this.activeJobs.size >= this.config.concurrency) { // At capacity. Wait and try again. this.pollTimer = setTimeout(() => this.poll(), this.config.pollInterval); return; }
this.fetchAndProcess() .then((fetched) => { // Got jobs? Poll immediately for more. Otherwise, back off. const delay = fetched > 0 ? 0 : this.config.pollInterval; this.pollTimer = setTimeout(() => this.poll(), delay); }) .catch(() => { // On error, back off before retrying. this.pollTimer = setTimeout(() => this.poll(), this.config.pollInterval * 2); });}The fetch request looks like this:
{ "queues": ["critical", "default", "low"], "count": 5, "worker_id": "worker_abc123", "visibility_timeout_ms": 30000}The server returns an array of jobs (possibly empty). The count parameter lets you fetch multiple jobs per request to reduce round trips.
Concurrency control
Section titled “Concurrency control”Use your language’s concurrency primitives to limit how many jobs execute simultaneously:
- Go: A buffered channel or semaphore (the Go SDK uses a
chan struct{}as a semaphore). - TypeScript/Node: Track active promises in a Map, check
.sizebefore fetching. - Python:
asyncio.Semaphore. - Ruby: A thread pool with
Queuefor work distribution. - Java: An
ExecutorServicewith a fixed thread pool. - Rust:
tokio::sync::Semaphore.
ACK and NACK
Section titled “ACK and NACK”After the handler completes, the worker sends the result back to the server:
On success (ACK):
POST /ojs/v1/workers/ack{ "job_id": "019461a8-1a2b-7c3d-8e4f-5a6b7c8d9e0f", "result": { "delivered": true }}On failure (NACK):
POST /ojs/v1/workers/nack{ "job_id": "019461a8-1a2b-7c3d-8e4f-5a6b7c8d9e0f", "error": { "code": "handler_error", "message": "Connection refused to smtp.example.com:587", "retryable": true, "details": { "stack": "..." } }}The server decides what happens next based on the retry policy. Your SDK does not need to track retry counts or calculate backoff. Just report success or failure.
Heartbeats
Section titled “Heartbeats”The worker sends periodic heartbeats to POST /ojs/v1/workers/heartbeat:
{ "worker_id": "worker_abc123", "state": "running", "active_jobs": 3, "active_job_ids": ["job-1", "job-2", "job-3"]}The heartbeat response can contain a server-directed state change:
{ "state": "quiet", "server_time": "2026-02-12T10:30:00Z"}Your worker must handle two directives:
quiet: Stop fetching new jobs, but finish processing active ones.terminate: Stop fetching and begin graceful shutdown.
Run the heartbeat loop in a separate thread/goroutine/task from the poll loop.
Graceful shutdown
Section titled “Graceful shutdown”When the worker receives a shutdown signal (SIGTERM, SIGINT, or a server terminate directive):
- Stop fetching new jobs.
- Wait for active jobs to complete (up to a grace period, typically 25 seconds).
- If jobs are still running after the grace period, abort them. The server will requeue them via visibility timeout.
// Go example: graceful shutdownfunc (w *Worker) Start(ctx context.Context) error { // ... start heartbeat and fetch loops ...
<-ctx.Done() // Wait for cancellation
w.state.Store(WorkerStateTerminate)
// Wait for active jobs or grace period graceDone := make(chan struct{}) go func() { w.waitForActiveJobs() close(graceDone) }()
select { case <-graceDone: // All jobs finished cleanly case <-time.After(w.config.gracePeriod): // Grace period expired }
return nil}Middleware chain implementation
Section titled “Middleware chain implementation”OJS has two middleware chains: enqueue middleware (runs before a job is sent to the server) and execution middleware (wraps job handler invocation on the worker).
Both use the next() pattern, like Express, Koa, or Rack middleware.
Execution middleware
Section titled “Execution middleware”// Go middleware signaturetype MiddlewareFunc func(ctx JobContext, next HandlerFunc) error
// Building the chain: wrap from inside outfunc (c *middlewareChain) then(handler HandlerFunc) HandlerFunc { h := handler for i := len(c.middleware) - 1; i >= 0; i-- { mw := c.middleware[i].fn next := h h = func(ctx JobContext) error { return mw(ctx, next) } } return h}The chain wraps the handler like an onion. Each middleware calls next() to pass control to the next layer. If a middleware does not call next(), the handler is never reached.
Example usage:
// TypeScript: logging middlewareworker.use(async (ctx, next) => { console.log(`Starting ${ctx.job.type} attempt ${ctx.attempt}`); const start = Date.now(); await next(); console.log(`Completed ${ctx.job.type} in ${Date.now() - start}ms`);});Enqueue middleware
Section titled “Enqueue middleware”Same pattern, but runs on the client side before the HTTP request:
# Python: enqueue middleware that adds trace context@client.enqueue_middlewareasync def add_trace_id(request, next): request.meta = request.meta or {} request.meta["trace_id"] = generate_trace_id() return await next(request)Named middleware and ordering
Section titled “Named middleware and ordering”The official SDKs support named middleware with insertion and removal operations. This is useful for frameworks that need to control middleware ordering:
// Add named middlewarew.UseNamed("logging", loggingMiddleware)w.UseNamed("metrics", metricsMiddleware)
// Insert before anotherchain.InsertBefore("metrics", "tracing", tracingMiddleware)
// Remove by namechain.Remove("logging")Workflow helpers
Section titled “Workflow helpers”Workflow helpers are pure functions that build workflow definitions. They do not make HTTP calls. The definitions are sent to the server via POST /ojs/v1/workflows.
chain(steps…)
Section titled “chain(steps…)”Sequential execution. The result of step N feeds step N+1.
import { chain } from '@openjobspec/sdk';
const wf = chain( { type: 'data.fetch', args: { url: '...' } }, { type: 'data.transform', args: { format: 'csv' } }, { type: 'data.load', args: { dest: 'warehouse' } },);
await client.workflow(wf);group(jobs…)
Section titled “group(jobs…)”Parallel execution. All jobs run concurrently with no ordering guarantees.
import { group } from '@openjobspec/sdk';
const wf = group( { type: 'export.csv', args: { reportId: 'rpt_456' } }, { type: 'export.pdf', args: { reportId: 'rpt_456' } }, { type: 'export.xlsx', args: { reportId: 'rpt_456' } },);batch(jobs, callbacks)
Section titled “batch(jobs, callbacks)”Parallel execution with callbacks that fire based on collective outcome.
import { batch } from '@openjobspec/sdk';
const wf = batch( [ { type: 'email.send', args: ['user1@example.com'] }, { type: 'email.send', args: ['user2@example.com'] }, ], { on_complete: { type: 'batch.report', args: [] }, on_failure: { type: 'batch.alert', args: [] }, },);Error handling
Section titled “Error handling”Structured errors
Section titled “Structured errors”OJS uses structured error objects, not plain strings. Your SDK should define an error type that carries this information:
interface OJSError { code: string; // Machine-readable error code message: string; // Human-readable description retryable: boolean; // Whether the client should retry details?: unknown; // Additional context}Common error codes from the server:
| Code | HTTP Status | Meaning |
|---|---|---|
invalid_payload | 400 | Malformed or invalid request body |
not_found | 404 | Job or resource not found |
invalid_state_transition | 409 | Attempted an invalid state transition |
duplicate | 409 | Unique job constraint violated |
internal_error | 500 | Server-side error |
Transport errors vs application errors
Section titled “Transport errors vs application errors”Distinguish between HTTP-level failures (network errors, timeouts) and OJS application errors (the server returned a structured error response). Users need to handle these differently.
Testing strategy
Section titled “Testing strategy”Unit tests with mocked transport
Section titled “Unit tests with mocked transport”Inject a mock transport to test client and worker logic without a real server:
// TypeScript: mock transport for unit testsconst mockTransport: Transport = { async request(opts) { if (opts.path === '/jobs' && opts.method === 'POST') { return { status: 201, body: { job: { id: 'test-id', type: 'email.send', state: 'available', args: ['test'] }, }, }; } throw new Error(`Unexpected request: ${opts.method} ${opts.path}`); },};
const client = new OJSClient({ url: 'http://unused', transport: mockTransport });const job = await client.enqueue('email.send', ['test']);assert(job.id === 'test-id');Integration tests against a real server
Section titled “Integration tests against a real server”Run the official OJS backend in Docker and test your SDK against it:
# Start the test serverdocker compose up -d
# Run your SDK's integration testsOJS_URL=http://localhost:8080 npm testIntegration tests should cover the full lifecycle: enqueue a job, fetch it with a worker, process it, and verify the final state.
Test the middleware chain
Section titled “Test the middleware chain”Verify that middleware executes in the correct order and that next() propagation works:
order = []
@worker.middlewareasync def first(ctx, next): order.append("first-before") result = await next() order.append("first-after") return result
@worker.middlewareasync def second(ctx, next): order.append("second-before") result = await next() order.append("second-after") return result
# After processing a job:assert order == ["first-before", "second-before", "second-after", "first-after"]Checklist for a complete SDK
Section titled “Checklist for a complete SDK”Here is a summary of what you need to implement:
- Transport layer: HTTP client with base path, content type, error parsing
- Client:
enqueue(),enqueueBatch(),getJob(),cancelJob() - Worker: Poll loop, handler registration, ACK/NACK, heartbeats
- Concurrency control: Limit concurrent job execution
- Graceful shutdown: Signal handling, grace period, drain active jobs
- Enqueue middleware:
next()chain on the client side - Execution middleware:
next()chain on the worker side - Workflow helpers:
chain(),group(),batch() - Structured errors: Error types with code, message, retryable
- Unit tests: Mocked transport covering happy and error paths
- Integration tests: Full lifecycle against a real OJS server