Skip to content

Migrate from Celery

Celery has been the go-to background task system for Python since 2009. It is battle-tested and feature-rich. It is also Python-only, uses pickle by default (a security risk), and has a notoriously complex configuration surface. If you are looking to move to a polyglot, standardized system, or just want a simpler architecture, this guide walks you through the migration from Celery to OJS.

CeleryOJSNotes
@app.task decoratorHandler registered with @worker.register()OJS handlers are decorated async functions
task.delay(args) / task.apply_async()client.enqueue(type, args)OJS uses explicit type strings, not task references
Celery(broker=..., backend=...)OJS server URLOJS has a single server, not separate broker + backend
CELERY_BROKER_URL (Redis/RabbitMQ)OJS_URL (OJS server)One URL instead of two
CELERY_RESULT_BACKENDBuilt into OJS job envelopeResults are stored on the job, no separate backend
celery worker CLIawait worker.start()OJS workers are embedded in your app
task.retry()Server-side retry policyNo client-side retry logic needed
chord(group, callback)batch(jobs, callbacks)Same fan-out/fan-in pattern
chain(task1.s(), task2.s())chain(step1, step2)Sequential pipelines
group(task1.s(), task2.s())group(job1, job2)Parallel execution
Canvas signatures (.s(), .si())Plain job specs (type + args)No special signature objects
celery beatOJS cron registrationServer-side cron, no separate scheduler process
task.AsyncResult(id)client.get_job(id)Job status includes state, result, and error history
Task routesQueue assignment per enqueueOJS routes by explicit queue, not pattern matching
pickle serializerJSON onlyOJS mandates JSON for security and interoperability

Celery:

tasks.py
from celery import Celery
app = Celery('myapp', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
@app.task(bind=True, max_retries=5, default_retry_delay=60)
def send_email(self, to, template):
try:
result = EmailService.send(to=to, template=template)
return {"message_id": result.id}
except SmtpError as exc:
raise self.retry(exc=exc)
# Calling the task
send_email.delay("user@example.com", "welcome")
# With options
send_email.apply_async(
args=["user@example.com", "welcome"],
queue="email",
countdown=300, # delay 5 minutes
)

OJS:

worker.py
import ojs
client = ojs.Client("http://localhost:8080")
worker = ojs.Worker("http://localhost:8080", queues=["email", "default"])
@worker.register("email.send")
async def handle_email(ctx: ojs.JobContext):
to = ctx.args[0]
template = ctx.args[1]
result = await EmailService.send(to=to, template=template)
return {"message_id": result.id}
# No manual retry logic needed. The server handles retries
# based on the retry policy set at enqueue time.
# Calling
await client.enqueue("email.send", ["user@example.com", "welcome"],
queue="email",
retry=ojs.RetryPolicy(
max_attempts=5,
initial_interval="PT1M",
backoff_coefficient=2.0,
),
)
# With delay
await client.enqueue("email.send", ["user@example.com", "welcome"],
queue="email",
delay_until="2026-02-12T11:00:00Z",
)

Celery:

result = send_email.delay("user@example.com", "welcome")
# Check status
print(result.state) # PENDING, STARTED, SUCCESS, FAILURE, etc.
print(result.result) # The return value (if SUCCESS)
print(result.traceback) # Traceback string (if FAILURE)

OJS:

job = await client.enqueue("email.send", ["user@example.com", "welcome"])
# Check status later
job = await client.get_job(job.id)
print(job.state) # available, active, completed, retryable, discarded, etc.
print(job.result) # The return value (if completed)
print(job.errors) # Structured error history for every failed attempt

Celery (Canvas):

from celery import chain, group, chord
# Sequential pipeline
pipeline = chain(
fetch_data.s(url="..."),
transform_data.s(format="csv"),
load_data.s(dest="warehouse"),
)
pipeline.apply_async()
# Parallel fan-out
parallel = group(
export_csv.s(report_id=456),
export_pdf.s(report_id=456),
export_xlsx.s(report_id=456),
)
parallel.apply_async()
# Fan-out with callback (chord)
chord(
group(
process_chunk.s(chunk_id=i) for i in range(10)
),
merge_results.s()
).apply_async()

OJS:

from ojs import chain, group, batch
# Sequential pipeline
await client.workflow(
chain(
{"type": "data.fetch", "args": [{"url": "..."}]},
{"type": "data.transform", "args": [{"format": "csv"}]},
{"type": "data.load", "args": [{"dest": "warehouse"}]},
)
)
# Parallel fan-out
await client.workflow(
group(
{"type": "export.csv", "args": [456]},
{"type": "export.pdf", "args": [456]},
{"type": "export.xlsx", "args": [456]},
)
)
# Fan-out with callbacks (like chord)
await client.workflow(
batch(
[{"type": "process.chunk", "args": [i]} for i in range(10)],
callbacks={
"on_success": {"type": "merge.results", "args": []},
"on_failure": {"type": "alert.operator", "args": []},
"on_complete": {"type": "cleanup.temp", "args": []},
},
)
)

OJS workflow definitions are plain data (dicts/objects), not special signature objects. No .s(), .si(), or .signature() calls needed.

Celery uses pickle as its default serializer. Pickle can serialize arbitrary Python objects, but it is a well-known security risk. Deserializing untrusted pickle data allows arbitrary code execution. The Celery documentation itself warns about this.

OJS mandates JSON-native types only: strings, numbers, booleans, nulls, arrays, and objects. No pickle, no marshal, no language-specific serialization. This is a deliberate constraint for security and cross-language compatibility.

If your Celery tasks pass complex Python objects as arguments (like Django model instances or dataclass instances), you need to convert them to simple types. This is the hardest part of the migration, but it is also the most valuable. Jobs with simple, serializable arguments are easier to debug, retry, and inspect.

Before (Celery with implicit pickle):

@app.task
def process_order(order):
# order is a pickled Python object. Fragile and risky.
order.charge()

After (OJS with explicit data):

@worker.register("order.process")
async def handle_order(ctx: ojs.JobContext):
order_id = ctx.args[0]
order = await Order.get(order_id) # Fetch from database
await order.charge()
return {"order_id": order_id, "status": "charged"}

Architecture: broker + backend vs. single server

Section titled “Architecture: broker + backend vs. single server”

Celery requires two pieces of infrastructure:

  1. A message broker (Redis, RabbitMQ, SQS) for job delivery.
  2. A result backend (Redis, PostgreSQL, Django ORM, Memcached) for storing results.

These are configured separately, can use different technologies, and add operational complexity.

OJS has a single server that handles everything: job storage, delivery, retry logic, result storage, and coordination. One URL, one process, one thing to monitor.

Celery: App -> Broker (Redis/RabbitMQ) + Result Backend (Redis/PostgreSQL)
OJS: App -> OJS Server (HTTP) -> Storage (Redis or PostgreSQL)

In Celery, retry logic lives in the task code:

@app.task(bind=True, max_retries=3)
def unreliable_task(self, data):
try:
do_work(data)
except TransientError as exc:
raise self.retry(exc=exc, countdown=60)

You have to manually call self.retry(), decide when to retry, and set the countdown. If you forget the retry call, the task fails permanently.

In OJS, retry logic is server-side. Your handler just raises an exception, and the server decides what to do based on the retry policy:

@worker.register("work.unreliable")
async def handle_work(ctx: ojs.JobContext):
# Just do the work. If it throws, the server retries automatically
# based on the retry policy set at enqueue time.
await do_work(ctx.args[0])

The retry policy (max attempts, backoff, jitter, non-retryable errors) is set at enqueue time, not in the handler code. This separates “what the job does” from “how failures are handled.”

Language support: Python-only vs. polyglot

Section titled “Language support: Python-only vs. polyglot”

Celery is Python-only. Your tasks, workers, and clients all need to be Python.

OJS is language-agnostic. A Python service can enqueue a job that a Go worker processes, or a TypeScript frontend can enqueue a job that a Rust worker handles. All six official SDKs (Go, TypeScript, Python, Java, Rust, Ruby) are interoperable.

Celery has over 100 configuration settings, spread across broker settings, result backend settings, worker settings, task settings, beat settings, and more. The interplay between settings can be surprising.

OJS has a simpler model: one server URL, and options at enqueue time. Worker configuration is minimal (queues, concurrency, poll interval).

docker-compose.yml
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
ojs-server:
image: ghcr.io/openjobspec/ojs-backend-redis:latest
ports:
- "8080:8080"
environment:
REDIS_URL: redis://redis:6379
depends_on:
- redis
Terminal window
docker compose up -d
curl http://localhost:8080/ojs/v1/health
# {"status":"ok"}

If you prefer PostgreSQL for stronger durability guarantees:

services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: ojs
POSTGRES_PASSWORD: secret
ports:
- "5432:5432"
ojs-server:
image: ghcr.io/openjobspec/ojs-backend-postgres:latest
ports:
- "8080:8080"
environment:
DATABASE_URL: postgres://postgres:secret@postgres:5432/ojs?sslmode=disable
depends_on:
- postgres
Terminal window
pip install ojs
# or with dev dependencies
pip install "ojs[dev]"
ojs_config.py
import ojs
client = ojs.Client(os.environ.get("OJS_URL", "http://localhost:8080"))
worker = ojs.Worker(
os.environ.get("OJS_URL", "http://localhost:8080"),
queues=["default", "email", "reports"],
concurrency=10,
)

Step 4: Convert task definitions to OJS handlers

Section titled “Step 4: Convert task definitions to OJS handlers”

Migrate one task at a time. For each Celery task:

  1. Choose a dot-namespaced type name (e.g., email.send, report.generate).
  2. Convert the function to an async handler.
  3. Replace self.retry() calls with simple exception raising (the server handles retry).
  4. Replace pickle-serialized arguments with JSON-native types.

Before (Celery):

@app.task(bind=True, max_retries=3)
def generate_report(self, user_id, report_type, filters=None):
try:
user = User.objects.get(id=user_id)
data = ReportEngine.generate(user, report_type, filters or {})
result = save_to_s3(data)
return {"url": result.url, "size": result.size}
except DatabaseError as exc:
raise self.retry(exc=exc, countdown=30)
except ReportError:
# Non-retryable. Let it fail permanently.
raise

After (OJS):

@worker.register("report.generate")
async def handle_report(ctx: ojs.JobContext):
user_id = ctx.args[0]
report_type = ctx.args[1]
filters = ctx.args[2] if len(ctx.args) > 2 else {}
user = await User.objects.aget(id=user_id)
data = await ReportEngine.generate(user, report_type, filters)
result = await save_to_s3(data)
return {"url": result.url, "size": result.size}
# DatabaseError will be caught by the worker and reported as a retryable error.
# ReportError will also be reported. You control retry behavior at enqueue time
# using non_retryable_errors in the retry policy.

Enqueue call:

# Before (Celery)
generate_report.apply_async(args=[user.id, "monthly"], kwargs={"filters": {"year": 2026}})
# After (OJS)
await client.enqueue("report.generate", [user.id, "monthly", {"year": 2026}],
queue="reports",
retry=ojs.RetryPolicy(
max_attempts=3,
initial_interval="PT30S",
backoff_coefficient=2.0,
non_retryable_errors=["ReportError"],
),
)

Before (Celery Beat):

celeryconfig.py
app.conf.beat_schedule = {
'cleanup-every-night': {
'task': 'tasks.cleanup',
'schedule': crontab(hour=3, minute=0),
},
'send-digest-weekly': {
'task': 'tasks.send_digest',
'schedule': crontab(hour=9, minute=0, day_of_week='monday'),
},
}

After (OJS cron, via HTTP or SDK):

# Register cron jobs once (at deploy time or startup)
await client.register_cron_job(ojs.CronJobRequest(
name="cleanup-nightly",
cron="0 3 * * *",
timezone="UTC",
type="maintenance.cleanup",
args=[],
))
await client.register_cron_job(ojs.CronJobRequest(
name="digest-weekly",
cron="0 9 * * 1",
timezone="America/New_York",
type="email.weekly_digest",
args=[],
))

OJS cron jobs are registered on the server. You do not need a separate celery beat process. The OJS server handles scheduling internally.

Celery signals:

from celery.signals import task_prerun, task_postrun
@task_prerun.connect
def on_task_start(sender, task_id, **kwargs):
print(f"Starting {sender.name}")
@task_postrun.connect
def on_task_end(sender, task_id, **kwargs):
print(f"Finished {sender.name}")

OJS execution middleware:

@worker.middleware
async def logging_mw(ctx: ojs.JobContext, next):
print(f"Starting {ctx.job.type} attempt {ctx.attempt}")
start = time.time()
result = await next()
print(f"Finished {ctx.job.type} in {time.time() - start:.2f}s")
return result

OJS middleware is more powerful than Celery signals because middleware can modify behavior (short-circuit execution, transform results, add error handling) while signals are purely observational.

Keep Celery workers running alongside OJS workers. Migrate one task type at a time:

  1. Create the OJS handler.
  2. Update enqueue calls (from task.delay() to client.enqueue()).
  3. Verify jobs process correctly.
  4. Remove the Celery task definition.

Once all tasks are migrated:

  1. Remove celery, celery[redis], and related packages from requirements.txt.
  2. Remove celeryconfig.py and task modules.
  3. Shut down Celery workers and beat processes.
  4. Remove broker and result backend configuration.
  • Security. No more pickle deserialization. OJS mandates JSON, which eliminates the entire class of deserialization vulnerabilities that plague Celery deployments.
  • Language interoperability. A Go microservice can process jobs enqueued by your Python app. Useful for gradual rewrites or polyglot architectures.
  • Simpler architecture. One server replaces the broker + result backend + beat scheduler. One URL to configure, one process to monitor.
  • Standardized retry policies. Exponential backoff with jitter, non-retryable error classification, and per-job configuration. No more self.retry() boilerplate in every task.
  • Structured error history. Every failed attempt records a structured error with type, message, and traceback. Celery only keeps the last error.
  • Conformance testing. The OJS conformance suite provides verified behavior guarantees. Celery’s behavior is defined by implementation, which can change between versions.
  • Backend choice. Start with Redis for speed, switch to PostgreSQL for stronger durability guarantees. Your application code stays the same.