Skip to content

Migrating Go Background Jobs to OJS

Many Go services start with goroutines and channels for background work, then outgrow them. This guide shows how to migrate to OJS for reliability, observability, and cross-service interop.

// ❌ Job is lost if the process crashes
go func() {
if err := sendWelcomeEmail(userID); err != nil {
log.Printf("failed to send email: %v", err)
}
}()

Problems: No retry on failure, no visibility, lost on crash, no backpressure.

// ❌ Jobs are lost if channel buffer is full or process restarts
jobs := make(chan Job, 100)
// Workers
for i := 0; i < 10; i++ {
go func() {
for job := range jobs {
process(job)
}
}()
}
// Enqueue
jobs <- Job{Type: "email", UserID: userID}

Problems: No persistence, no retry, no monitoring, no cross-service access.

// ❌ Custom implementation = custom bugs
for {
rows, _ := db.Query("SELECT * FROM jobs WHERE status='pending' LIMIT 10 FOR UPDATE SKIP LOCKED")
for rows.Next() {
var j Job
rows.Scan(&j.ID, &j.Type, &j.Payload)
go process(j)
}
time.Sleep(time.Second)
}

Problems: Reinventing retry logic, heartbeats, visibility timeout, lifecycle management.


Terminal window
go get github.com/openjobspec/ojs-go-sdk
Terminal window
# Zero-dependency option (in-memory, <50ms startup)
cd ojs-backend-lite && make run
# Or with Docker (persistent)
docker run -d -p 8080:8080 ghcr.io/openjobspec/ojs-backend-redis:0.2.0
// Before
go sendWelcomeEmail(userID)
// After
client := ojs.NewClient("http://localhost:8080")
client.Enqueue(ctx, "email.welcome", ojs.Args{
"user_id": userID,
})

Step 4: Replace channel workers with registered handlers

Section titled “Step 4: Replace channel workers with registered handlers”
// Before
for job := range jobs {
switch job.Type {
case "email":
sendEmail(job.UserID)
case "report":
generateReport(job.ReportID)
}
}
// After
worker := ojs.NewWorker("http://localhost:8080",
ojs.WithQueues("default", "email"),
ojs.WithConcurrency(10),
)
worker.Register("email.welcome", func(ctx ojs.JobContext) error {
userID := ctx.Job.Args["user_id"].(string)
return sendWelcomeEmail(userID)
})
worker.Register("report.generate", func(ctx ojs.JobContext) error {
reportID := ctx.Job.Args["report_id"].(string)
return generateReport(reportID)
})
// Blocks until context is cancelled
worker.Start(ctx)
// Retry policy
client.Enqueue(ctx, "payment.charge", ojs.Args{"order_id": orderID},
ojs.WithRetry(ojs.RetryPolicy{
MaxAttempts: 5,
Backoff: "exponential",
}),
)
// Scheduled jobs
client.Enqueue(ctx, "reminder.send", ojs.Args{"user_id": userID},
ojs.WithDelay(24 * time.Hour),
)
// Unique jobs (deduplicate)
client.Enqueue(ctx, "cache.warm", ojs.Args{"key": cacheKey},
ojs.WithUnique(ojs.UniquePolicy{Period: 5 * time.Minute}),
)
ConcernGoroutinesOJS
Persistence❌ Lost on crash✅ Stored in backend
Retries❌ Manual✅ Configurable policy
Monitoring❌ None✅ Prometheus + Grafana
Backpressure❌ Channel overflow✅ Queue-based
Cross-service❌ In-process only✅ Any language/service
Visibility❌ None✅ Admin UI + CLI
Schedulingtime.AfterFunc✅ Cron + delay_until
Graceful shutdown❌ Manual✅ Built-in drain
  • Install ojs-go-sdk
  • Start a backend (Lite for dev, Redis/Postgres for prod)
  • Replace go func() calls with client.Enqueue()
  • Replace channel workers with worker.Register()
  • Add retry policies to critical jobs
  • Add OpenTelemetry middleware for tracing
  • Set up Prometheus + Grafana dashboards
  • Test with ojs doctor health check

Next: Observability & Debugging →