Migrate from Celery
Celery has been the go-to background task system for Python since 2009. It is battle-tested and feature-rich. It is also Python-only, uses pickle by default (a security risk), and has a notoriously complex configuration surface. If you are looking to move to a polyglot, standardized system, or just want a simpler architecture, this guide walks you through the migration from Celery to OJS.
Concept mapping
Section titled “Concept mapping”| Celery | OJS | Notes |
|---|---|---|
@app.task decorator | Handler registered with @worker.register() | OJS handlers are decorated async functions |
task.delay(args) / task.apply_async() | client.enqueue(type, args) | OJS uses explicit type strings, not task references |
Celery(broker=..., backend=...) | OJS server URL | OJS has a single server, not separate broker + backend |
CELERY_BROKER_URL (Redis/RabbitMQ) | OJS_URL (OJS server) | One URL instead of two |
CELERY_RESULT_BACKEND | Built into OJS job envelope | Results are stored on the job, no separate backend |
celery worker CLI | await worker.start() | OJS workers are embedded in your app |
task.retry() | Server-side retry policy | No client-side retry logic needed |
chord(group, callback) | batch(jobs, callbacks) | Same fan-out/fan-in pattern |
chain(task1.s(), task2.s()) | chain(step1, step2) | Sequential pipelines |
group(task1.s(), task2.s()) | group(job1, job2) | Parallel execution |
Canvas signatures (.s(), .si()) | Plain job specs (type + args) | No special signature objects |
celery beat | OJS cron registration | Server-side cron, no separate scheduler process |
task.AsyncResult(id) | client.get_job(id) | Job status includes state, result, and error history |
| Task routes | Queue assignment per enqueue | OJS routes by explicit queue, not pattern matching |
pickle serializer | JSON only | OJS mandates JSON for security and interoperability |
Code comparison
Section titled “Code comparison”Defining and calling tasks
Section titled “Defining and calling tasks”Celery:
from celery import Celery
app = Celery('myapp', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
@app.task(bind=True, max_retries=5, default_retry_delay=60)def send_email(self, to, template): try: result = EmailService.send(to=to, template=template) return {"message_id": result.id} except SmtpError as exc: raise self.retry(exc=exc)
# Calling the tasksend_email.delay("user@example.com", "welcome")
# With optionssend_email.apply_async( args=["user@example.com", "welcome"], queue="email", countdown=300, # delay 5 minutes)OJS:
import ojs
client = ojs.Client("http://localhost:8080")worker = ojs.Worker("http://localhost:8080", queues=["email", "default"])
@worker.register("email.send")async def handle_email(ctx: ojs.JobContext): to = ctx.args[0] template = ctx.args[1] result = await EmailService.send(to=to, template=template) return {"message_id": result.id} # No manual retry logic needed. The server handles retries # based on the retry policy set at enqueue time.
# Callingawait client.enqueue("email.send", ["user@example.com", "welcome"], queue="email", retry=ojs.RetryPolicy( max_attempts=5, initial_interval="PT1M", backoff_coefficient=2.0, ),)
# With delayawait client.enqueue("email.send", ["user@example.com", "welcome"], queue="email", delay_until="2026-02-12T11:00:00Z",)Checking results
Section titled “Checking results”Celery:
result = send_email.delay("user@example.com", "welcome")
# Check statusprint(result.state) # PENDING, STARTED, SUCCESS, FAILURE, etc.print(result.result) # The return value (if SUCCESS)print(result.traceback) # Traceback string (if FAILURE)OJS:
job = await client.enqueue("email.send", ["user@example.com", "welcome"])
# Check status laterjob = await client.get_job(job.id)print(job.state) # available, active, completed, retryable, discarded, etc.print(job.result) # The return value (if completed)print(job.errors) # Structured error history for every failed attemptWorkflows
Section titled “Workflows”Celery (Canvas):
from celery import chain, group, chord
# Sequential pipelinepipeline = chain( fetch_data.s(url="..."), transform_data.s(format="csv"), load_data.s(dest="warehouse"),)pipeline.apply_async()
# Parallel fan-outparallel = group( export_csv.s(report_id=456), export_pdf.s(report_id=456), export_xlsx.s(report_id=456),)parallel.apply_async()
# Fan-out with callback (chord)chord( group( process_chunk.s(chunk_id=i) for i in range(10) ), merge_results.s()).apply_async()OJS:
from ojs import chain, group, batch
# Sequential pipelineawait client.workflow( chain( {"type": "data.fetch", "args": [{"url": "..."}]}, {"type": "data.transform", "args": [{"format": "csv"}]}, {"type": "data.load", "args": [{"dest": "warehouse"}]}, ))
# Parallel fan-outawait client.workflow( group( {"type": "export.csv", "args": [456]}, {"type": "export.pdf", "args": [456]}, {"type": "export.xlsx", "args": [456]}, ))
# Fan-out with callbacks (like chord)await client.workflow( batch( [{"type": "process.chunk", "args": [i]} for i in range(10)], callbacks={ "on_success": {"type": "merge.results", "args": []}, "on_failure": {"type": "alert.operator", "args": []}, "on_complete": {"type": "cleanup.temp", "args": []}, }, ))OJS workflow definitions are plain data (dicts/objects), not special signature objects. No .s(), .si(), or .signature() calls needed.
Key differences
Section titled “Key differences”Serialization: pickle vs. JSON
Section titled “Serialization: pickle vs. JSON”Celery uses pickle as its default serializer. Pickle can serialize arbitrary Python objects, but it is a well-known security risk. Deserializing untrusted pickle data allows arbitrary code execution. The Celery documentation itself warns about this.
OJS mandates JSON-native types only: strings, numbers, booleans, nulls, arrays, and objects. No pickle, no marshal, no language-specific serialization. This is a deliberate constraint for security and cross-language compatibility.
If your Celery tasks pass complex Python objects as arguments (like Django model instances or dataclass instances), you need to convert them to simple types. This is the hardest part of the migration, but it is also the most valuable. Jobs with simple, serializable arguments are easier to debug, retry, and inspect.
Before (Celery with implicit pickle):
@app.taskdef process_order(order): # order is a pickled Python object. Fragile and risky. order.charge()After (OJS with explicit data):
@worker.register("order.process")async def handle_order(ctx: ojs.JobContext): order_id = ctx.args[0] order = await Order.get(order_id) # Fetch from database await order.charge() return {"order_id": order_id, "status": "charged"}Architecture: broker + backend vs. single server
Section titled “Architecture: broker + backend vs. single server”Celery requires two pieces of infrastructure:
- A message broker (Redis, RabbitMQ, SQS) for job delivery.
- A result backend (Redis, PostgreSQL, Django ORM, Memcached) for storing results.
These are configured separately, can use different technologies, and add operational complexity.
OJS has a single server that handles everything: job storage, delivery, retry logic, result storage, and coordination. One URL, one process, one thing to monitor.
Celery: App -> Broker (Redis/RabbitMQ) + Result Backend (Redis/PostgreSQL)OJS: App -> OJS Server (HTTP) -> Storage (Redis or PostgreSQL)Retry logic: client-side vs. server-side
Section titled “Retry logic: client-side vs. server-side”In Celery, retry logic lives in the task code:
@app.task(bind=True, max_retries=3)def unreliable_task(self, data): try: do_work(data) except TransientError as exc: raise self.retry(exc=exc, countdown=60)You have to manually call self.retry(), decide when to retry, and set the countdown. If you forget the retry call, the task fails permanently.
In OJS, retry logic is server-side. Your handler just raises an exception, and the server decides what to do based on the retry policy:
@worker.register("work.unreliable")async def handle_work(ctx: ojs.JobContext): # Just do the work. If it throws, the server retries automatically # based on the retry policy set at enqueue time. await do_work(ctx.args[0])The retry policy (max attempts, backoff, jitter, non-retryable errors) is set at enqueue time, not in the handler code. This separates “what the job does” from “how failures are handled.”
Language support: Python-only vs. polyglot
Section titled “Language support: Python-only vs. polyglot”Celery is Python-only. Your tasks, workers, and clients all need to be Python.
OJS is language-agnostic. A Python service can enqueue a job that a Go worker processes, or a TypeScript frontend can enqueue a job that a Rust worker handles. All six official SDKs (Go, TypeScript, Python, Java, Rust, Ruby) are interoperable.
Configuration complexity
Section titled “Configuration complexity”Celery has over 100 configuration settings, spread across broker settings, result backend settings, worker settings, task settings, beat settings, and more. The interplay between settings can be surprising.
OJS has a simpler model: one server URL, and options at enqueue time. Worker configuration is minimal (queues, concurrency, poll interval).
Step-by-step migration
Section titled “Step-by-step migration”Step 1: Set up the OJS server
Section titled “Step 1: Set up the OJS server”services: redis: image: redis:7-alpine ports: - "6379:6379"
ojs-server: image: ghcr.io/openjobspec/ojs-backend-redis:latest ports: - "8080:8080" environment: REDIS_URL: redis://redis:6379 depends_on: - redisdocker compose up -dcurl http://localhost:8080/ojs/v1/health# {"status":"ok"}If you prefer PostgreSQL for stronger durability guarantees:
services: postgres: image: postgres:15 environment: POSTGRES_DB: ojs POSTGRES_PASSWORD: secret ports: - "5432:5432"
ojs-server: image: ghcr.io/openjobspec/ojs-backend-postgres:latest ports: - "8080:8080" environment: DATABASE_URL: postgres://postgres:secret@postgres:5432/ojs?sslmode=disable depends_on: - postgresStep 2: Install the Python SDK
Section titled “Step 2: Install the Python SDK”pip install ojs# or with dev dependenciespip install "ojs[dev]"Step 3: Create OJS client and worker
Section titled “Step 3: Create OJS client and worker”import ojs
client = ojs.Client(os.environ.get("OJS_URL", "http://localhost:8080"))worker = ojs.Worker( os.environ.get("OJS_URL", "http://localhost:8080"), queues=["default", "email", "reports"], concurrency=10,)Step 4: Convert task definitions to OJS handlers
Section titled “Step 4: Convert task definitions to OJS handlers”Migrate one task at a time. For each Celery task:
- Choose a dot-namespaced type name (e.g.,
email.send,report.generate). - Convert the function to an async handler.
- Replace
self.retry()calls with simple exception raising (the server handles retry). - Replace pickle-serialized arguments with JSON-native types.
Before (Celery):
@app.task(bind=True, max_retries=3)def generate_report(self, user_id, report_type, filters=None): try: user = User.objects.get(id=user_id) data = ReportEngine.generate(user, report_type, filters or {}) result = save_to_s3(data) return {"url": result.url, "size": result.size} except DatabaseError as exc: raise self.retry(exc=exc, countdown=30) except ReportError: # Non-retryable. Let it fail permanently. raiseAfter (OJS):
@worker.register("report.generate")async def handle_report(ctx: ojs.JobContext): user_id = ctx.args[0] report_type = ctx.args[1] filters = ctx.args[2] if len(ctx.args) > 2 else {}
user = await User.objects.aget(id=user_id) data = await ReportEngine.generate(user, report_type, filters) result = await save_to_s3(data) return {"url": result.url, "size": result.size} # DatabaseError will be caught by the worker and reported as a retryable error. # ReportError will also be reported. You control retry behavior at enqueue time # using non_retryable_errors in the retry policy.Enqueue call:
# Before (Celery)generate_report.apply_async(args=[user.id, "monthly"], kwargs={"filters": {"year": 2026}})
# After (OJS)await client.enqueue("report.generate", [user.id, "monthly", {"year": 2026}], queue="reports", retry=ojs.RetryPolicy( max_attempts=3, initial_interval="PT30S", backoff_coefficient=2.0, non_retryable_errors=["ReportError"], ),)Step 5: Convert Celery Beat schedules
Section titled “Step 5: Convert Celery Beat schedules”Before (Celery Beat):
app.conf.beat_schedule = { 'cleanup-every-night': { 'task': 'tasks.cleanup', 'schedule': crontab(hour=3, minute=0), }, 'send-digest-weekly': { 'task': 'tasks.send_digest', 'schedule': crontab(hour=9, minute=0, day_of_week='monday'), },}After (OJS cron, via HTTP or SDK):
# Register cron jobs once (at deploy time or startup)await client.register_cron_job(ojs.CronJobRequest( name="cleanup-nightly", cron="0 3 * * *", timezone="UTC", type="maintenance.cleanup", args=[],))
await client.register_cron_job(ojs.CronJobRequest( name="digest-weekly", cron="0 9 * * 1", timezone="America/New_York", type="email.weekly_digest", args=[],))OJS cron jobs are registered on the server. You do not need a separate celery beat process. The OJS server handles scheduling internally.
Step 6: Add middleware
Section titled “Step 6: Add middleware”Celery signals:
from celery.signals import task_prerun, task_postrun
@task_prerun.connectdef on_task_start(sender, task_id, **kwargs): print(f"Starting {sender.name}")
@task_postrun.connectdef on_task_end(sender, task_id, **kwargs): print(f"Finished {sender.name}")OJS execution middleware:
@worker.middlewareasync def logging_mw(ctx: ojs.JobContext, next): print(f"Starting {ctx.job.type} attempt {ctx.attempt}") start = time.time() result = await next() print(f"Finished {ctx.job.type} in {time.time() - start:.2f}s") return resultOJS middleware is more powerful than Celery signals because middleware can modify behavior (short-circuit execution, transform results, add error handling) while signals are purely observational.
Step 7: Run both systems during migration
Section titled “Step 7: Run both systems during migration”Keep Celery workers running alongside OJS workers. Migrate one task type at a time:
- Create the OJS handler.
- Update enqueue calls (from
task.delay()toclient.enqueue()). - Verify jobs process correctly.
- Remove the Celery task definition.
Step 8: Cutover
Section titled “Step 8: Cutover”Once all tasks are migrated:
- Remove
celery,celery[redis], and related packages fromrequirements.txt. - Remove
celeryconfig.pyand task modules. - Shut down Celery workers and beat processes.
- Remove broker and result backend configuration.
What you gain
Section titled “What you gain”- Security. No more pickle deserialization. OJS mandates JSON, which eliminates the entire class of deserialization vulnerabilities that plague Celery deployments.
- Language interoperability. A Go microservice can process jobs enqueued by your Python app. Useful for gradual rewrites or polyglot architectures.
- Simpler architecture. One server replaces the broker + result backend + beat scheduler. One URL to configure, one process to monitor.
- Standardized retry policies. Exponential backoff with jitter, non-retryable error classification, and per-job configuration. No more
self.retry()boilerplate in every task. - Structured error history. Every failed attempt records a structured error with type, message, and traceback. Celery only keeps the last error.
- Conformance testing. The OJS conformance suite provides verified behavior guarantees. Celery’s behavior is defined by implementation, which can change between versions.
- Backend choice. Start with Redis for speed, switch to PostgreSQL for stronger durability guarantees. Your application code stays the same.