Skip to content

Tutorial: Your First Job in Python

This tutorial walks you through building a background job system with the Python SDK using asyncio. You will enqueue, process, and monitor jobs using modern async Python.

Terminal window
mkdir ojs-python-tutorial && cd ojs-python-tutorial
python -m venv .venv
source .venv/bin/activate # Windows: .venv\Scripts\activate
pip install openjobspec

Create enqueue.py:

enqueue.py
import asyncio
from ojs import OJSClient
async def main():
client = OJSClient(url="http://localhost:8080")
# Enqueue a simple job
job = await client.enqueue(
"email.send",
args=["user@example.com", "welcome"],
)
print(f"Enqueued job {job.id} in state: {job.state}")
# Enqueue with retry policy
job2 = await client.enqueue(
"report.generate",
args=["rpt_001", "pdf"],
queue="reports",
retry={"max_attempts": 5, "backoff": "exponential"},
)
print(f"Enqueued job {job2.id} with retries")
await client.close()
asyncio.run(main())

Run it:

Terminal window
python enqueue.py

Output:

Enqueued job 019461a8-1a2b-7c3d-8e4f-5a6b7c8d9e0f in state: available
Enqueued job 019461a8-2b3c-7d4e-9f50-6a7b8c9d0e1f with retries

Create worker.py:

worker.py
import asyncio
import signal
from ojs import OJSWorker
worker = OJSWorker(
url="http://localhost:8080",
queues=["default", "reports"],
concurrency=5,
)
@worker.handle("email.send")
async def handle_email(ctx):
to, template = ctx.args[0], ctx.args[1]
print(f" Sending '{template}' email to {to}")
# Simulate sending email
await asyncio.sleep(0.2)
return {"delivered": True}
@worker.handle("report.generate")
async def handle_report(ctx):
report_id, fmt = ctx.args[0], ctx.args[1]
print(f" Generating {fmt.upper()} report {report_id}")
# Simulate report generation
await asyncio.sleep(1.0)
return {"url": f"https://reports.example.com/{report_id}.{fmt}"}
async def main():
# Graceful shutdown
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(worker.stop()))
print("Worker started, waiting for jobs...")
await worker.start()
asyncio.run(main())

Run the worker:

Terminal window
python worker.py

Create worker_with_middleware.py:

worker_with_middleware.py
import asyncio
import signal
import time
import logging
from ojs import OJSWorker
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ojs")
worker = OJSWorker(
url="http://localhost:8080",
queues=["default"],
concurrency=5,
)
# Middleware: timing and logging
@worker.middleware
async def log_jobs(ctx, next_handler):
start = time.monotonic()
logger.info(f"[START] {ctx.job.type} ({ctx.job.id})")
try:
result = await next_handler(ctx)
elapsed = (time.monotonic() - start) * 1000
logger.info(f"[DONE] {ctx.job.type} took {elapsed:.0f}ms")
return result
except Exception as exc:
elapsed = (time.monotonic() - start) * 1000
logger.error(f"[FAIL] {ctx.job.type} after {elapsed:.0f}ms: {exc}")
raise
# Middleware: error enrichment
@worker.middleware
async def enrich_errors(ctx, next_handler):
try:
return await next_handler(ctx)
except Exception as exc:
# Add context to the error for debugging
raise type(exc)(
f"job={ctx.job.type} id={ctx.job.id} attempt={ctx.job.attempt}: {exc}"
) from exc
@worker.handle("email.send")
async def handle_email(ctx):
to, template = ctx.args[0], ctx.args[1]
print(f" Sending '{template}' email to {to}")
await asyncio.sleep(0.2)
return {"delivered": True}
async def main():
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(worker.stop()))
print("Worker started with middleware, waiting for jobs...")
await worker.start()
asyncio.run(main())
batch.py
import asyncio
from ojs import OJSClient
async def main():
client = OJSClient(url="http://localhost:8080")
# Batch enqueue multiple jobs
jobs = await client.enqueue_batch([
{"type": "email.send", "args": ["alice@example.com", "welcome"]},
{"type": "email.send", "args": ["bob@example.com", "welcome"]},
{"type": "email.send", "args": ["carol@example.com", "welcome"]},
])
print(f"Enqueued {len(jobs)} jobs:")
for job in jobs:
print(f" {job.id}: {job.state}")
# Check status of first job
status = await client.get_job(jobs[0].id)
print(f"\nFirst job state: {status.state}")
await client.close()
asyncio.run(main())
  • An async Python client for enqueuing jobs
  • An async worker with decorator-based handler registration
  • Custom middleware for logging and error enrichment
  • Batch operations for efficient bulk enqueuing