Tutorial: Your First Job in Python
This tutorial walks you through building a background job system with the Python SDK using asyncio. You will enqueue, process, and monitor jobs using modern async Python.
Prerequisites
Section titled “Prerequisites”- Docker and Docker Compose
- Python 3.11 or later
- A running OJS server (see Quickstart Step 1)
Step 1: Set up the project
Section titled “Step 1: Set up the project”mkdir ojs-python-tutorial && cd ojs-python-tutorialpython -m venv .venvsource .venv/bin/activate # Windows: .venv\Scripts\activatepip install openjobspecStep 2: Enqueue a job
Section titled “Step 2: Enqueue a job”Create enqueue.py:
import asynciofrom ojs import OJSClient
async def main(): client = OJSClient(url="http://localhost:8080")
# Enqueue a simple job job = await client.enqueue( "email.send", args=["user@example.com", "welcome"], ) print(f"Enqueued job {job.id} in state: {job.state}")
# Enqueue with retry policy job2 = await client.enqueue( "report.generate", args=["rpt_001", "pdf"], queue="reports", retry={"max_attempts": 5, "backoff": "exponential"}, ) print(f"Enqueued job {job2.id} with retries")
await client.close()
asyncio.run(main())Run it:
python enqueue.pyOutput:
Enqueued job 019461a8-1a2b-7c3d-8e4f-5a6b7c8d9e0f in state: availableEnqueued job 019461a8-2b3c-7d4e-9f50-6a7b8c9d0e1f with retriesStep 3: Build a worker
Section titled “Step 3: Build a worker”Create worker.py:
import asyncioimport signalfrom ojs import OJSWorker
worker = OJSWorker( url="http://localhost:8080", queues=["default", "reports"], concurrency=5,)
@worker.handle("email.send")async def handle_email(ctx): to, template = ctx.args[0], ctx.args[1] print(f" Sending '{template}' email to {to}")
# Simulate sending email await asyncio.sleep(0.2) return {"delivered": True}
@worker.handle("report.generate")async def handle_report(ctx): report_id, fmt = ctx.args[0], ctx.args[1] print(f" Generating {fmt.upper()} report {report_id}")
# Simulate report generation await asyncio.sleep(1.0) return {"url": f"https://reports.example.com/{report_id}.{fmt}"}
async def main(): # Graceful shutdown loop = asyncio.get_event_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, lambda: asyncio.create_task(worker.stop()))
print("Worker started, waiting for jobs...") await worker.start()
asyncio.run(main())Run the worker:
python worker.pyStep 4: Add middleware
Section titled “Step 4: Add middleware”Create worker_with_middleware.py:
import asyncioimport signalimport timeimport loggingfrom ojs import OJSWorker
logging.basicConfig(level=logging.INFO)logger = logging.getLogger("ojs")
worker = OJSWorker( url="http://localhost:8080", queues=["default"], concurrency=5,)
# Middleware: timing and logging@worker.middlewareasync def log_jobs(ctx, next_handler): start = time.monotonic() logger.info(f"[START] {ctx.job.type} ({ctx.job.id})")
try: result = await next_handler(ctx) elapsed = (time.monotonic() - start) * 1000 logger.info(f"[DONE] {ctx.job.type} took {elapsed:.0f}ms") return result except Exception as exc: elapsed = (time.monotonic() - start) * 1000 logger.error(f"[FAIL] {ctx.job.type} after {elapsed:.0f}ms: {exc}") raise
# Middleware: error enrichment@worker.middlewareasync def enrich_errors(ctx, next_handler): try: return await next_handler(ctx) except Exception as exc: # Add context to the error for debugging raise type(exc)( f"job={ctx.job.type} id={ctx.job.id} attempt={ctx.job.attempt}: {exc}" ) from exc
@worker.handle("email.send")async def handle_email(ctx): to, template = ctx.args[0], ctx.args[1] print(f" Sending '{template}' email to {to}") await asyncio.sleep(0.2) return {"delivered": True}
async def main(): loop = asyncio.get_event_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, lambda: asyncio.create_task(worker.stop()))
print("Worker started with middleware, waiting for jobs...") await worker.start()
asyncio.run(main())Step 5: Batch operations
Section titled “Step 5: Batch operations”import asynciofrom ojs import OJSClient
async def main(): client = OJSClient(url="http://localhost:8080")
# Batch enqueue multiple jobs jobs = await client.enqueue_batch([ {"type": "email.send", "args": ["alice@example.com", "welcome"]}, {"type": "email.send", "args": ["bob@example.com", "welcome"]}, {"type": "email.send", "args": ["carol@example.com", "welcome"]}, ])
print(f"Enqueued {len(jobs)} jobs:") for job in jobs: print(f" {job.id}: {job.state}")
# Check status of first job status = await client.get_job(jobs[0].id) print(f"\nFirst job state: {status.state}")
await client.close()
asyncio.run(main())What you built
Section titled “What you built”- An async Python client for enqueuing jobs
- An async worker with decorator-based handler registration
- Custom middleware for logging and error enrichment
- Batch operations for efficient bulk enqueuing
Next steps
Section titled “Next steps”- Add workflow orchestration with chain, group, and batch
- Explore scheduled jobs for delayed and cron execution
- Use unique jobs for deduplication
- Read the Python SDK API docs for the full reference