Batteries

Background Jobs

Grit uses asynq -- a Redis-backed job queue library for Go -- to handle background processing. Send emails, generate thumbnails, clean up expired tokens, or run any async task with automatic retries and priority queues.

Architecture

The job system has two components: a client that enqueues jobs and a worker that processes them. Both connect to the same Redis instance. The worker runs as a goroutine inside the API server -- no separate process needed.

job-architecture.txt
┌──────────────────┐     ┌─────────┐     ┌──────────────────┐
│  API Handler     │     │  Redis  │     │  Worker          │
│                  │     │         │     │                  │
│  client.Enqueue  │────>│  Queue  │────>│  handleEmailSend │
│  SendEmail(...)  │     │         │     │  handleImage...  │
│                  │     │         │     │  handleCleanup   │
└──────────────────┘     └─────────┘     └──────────────────┘

Job Client

The job client at internal/jobs/client.go provides typed methods for enqueuing each built-in job. It handles JSON serialization of payloads and configures retry policies per job type.

internal/jobs/client.go
// Task type constants
const (
    TypeEmailSend     = "email:send"
    TypeImageProcess  = "image:process"
    TypeTokensCleanup = "tokens:cleanup"
)

// Client wraps asynq.Client for enqueuing background jobs.
type Client struct {
    client *asynq.Client
}

// NewClient creates a new job queue client connected to Redis.
func NewClient(redisURL string) (*Client, error)

// Close shuts down the client connection.
func (c *Client) Close() error

Enqueue Methods

internal/jobs/client.go (enqueue methods)
// EnqueueSendEmail enqueues an email send job.
// Max retries: 3
func (c *Client) EnqueueSendEmail(
    to, subject, template string,
    data map[string]interface{},
) error

// EnqueueProcessImage enqueues an image processing job.
// Max retries: 2
func (c *Client) EnqueueProcessImage(
    uploadID uint,
    key, mimeType string,
) error

// EnqueueTokensCleanup enqueues a token cleanup job.
// Max retries: 1
func (c *Client) EnqueueTokensCleanup() error

Job Payloads

internal/jobs/client.go (payloads)
// EmailPayload holds the data for an email send job.
type EmailPayload struct {
    To       string                 `json:"to"`
    Subject  string                 `json:"subject"`
    Template string                 `json:"template"`
    Data     map[string]interface{} `json:"data"`
}

// ImagePayload holds the data for an image processing job.
type ImagePayload struct {
    UploadID uint   `json:"upload_id"`
    Key      string `json:"key"`
    MimeType string `json:"mime_type"`
}

Worker Setup

The worker is started in main.go alongside the HTTP server. It receives all service dependencies via the WorkerDeps struct, giving handler functions access to the database, mailer, storage, and cache.

internal/jobs/workers.go
// WorkerDeps holds dependencies needed by job handlers.
type WorkerDeps struct {
    DB      *gorm.DB
    Mailer  *mail.Mailer
    Storage *storage.Storage
    Cache   *cache.Cache
}

// StartWorker starts the asynq worker server in a goroutine.
// Returns a stop function and any startup error.
func StartWorker(redisURL string, deps WorkerDeps) (func(), error) {
    redisOpt, _ := asynq.ParseRedisURI(redisURL)

    srv := asynq.NewServer(redisOpt, asynq.Config{
        Concurrency: 10,
        Queues: map[string]int{
            "default":  6,
            "critical": 3,
            "low":      1,
        },
    })

    mux := asynq.NewServeMux()
    mux.HandleFunc(TypeEmailSend, handleEmailSend(deps))
    mux.HandleFunc(TypeImageProcess, handleImageProcess(deps))
    mux.HandleFunc(TypeTokensCleanup, handleTokensCleanup(deps))

    go func() {
        srv.Run(mux)
    }()

    return func() { srv.Shutdown() }, nil
}

Starting the Worker in main.go

cmd/server/main.go (excerpt)
// Start the background worker
stopWorker, err := jobs.StartWorker(cfg.RedisURL, jobs.WorkerDeps{
    DB:      db,
    Mailer:  mailer,
    Storage: store,
    Cache:   cacheService,
})
if err != nil {
    log.Fatalf("Failed to start worker: %v", err)
}
defer stopWorker()

Queue Priorities

Jobs are distributed across three priority queues. The worker allocates processing capacity based on these weights: critical gets 30%, default gets 60%, and low gets 10%.

QueueWeightUse Case
critical3 (30%)Password resets, payment webhooks
default6 (60%)Emails, image processing
low1 (10%)Cleanup, analytics, reports

Retry Configuration

Each job type has a configured maximum retry count. When a handler returns an error, asynq automatically retries with exponential backoff. After exhausting all retries, the job is moved to the "archived" (failed) state.

retry-config.go
// Email: 3 retries (important to deliver)
task := asynq.NewTask(TypeEmailSend, payload)
_, err = c.client.Enqueue(task, asynq.MaxRetry(3))

// Image: 2 retries (can be re-triggered)
task := asynq.NewTask(TypeImageProcess, payload)
_, err = c.client.Enqueue(task, asynq.MaxRetry(2))

// Cleanup: 1 retry (runs hourly anyway)
task := asynq.NewTask(TypeTokensCleanup, nil)
_, err = c.client.Enqueue(task, asynq.MaxRetry(1))

// Custom: enqueue to a specific queue with custom retry
task := asynq.NewTask("invoice:generate", payload)
_, err = c.client.Enqueue(task,
    asynq.MaxRetry(5),
    asynq.Queue("critical"),
    asynq.Timeout(30*time.Second),
)

Adding Custom Jobs

To add a new job type, define the task type constant, create a payload struct, add an enqueue method to the client, write the handler function, and register it in the worker mux.

custom-job-example.go
// 1. Add task type constant
const TypeInvoiceGenerate = "invoice:generate"

// 2. Define payload
type InvoicePayload struct {
    OrderID   uint   `json:"order_id"`
    UserEmail string `json:"user_email"`
}

// 3. Add enqueue method to Client
func (c *Client) EnqueueGenerateInvoice(orderID uint, email string) error {
    payload, _ := json.Marshal(InvoicePayload{
        OrderID:   orderID,
        UserEmail: email,
    })
    task := asynq.NewTask(TypeInvoiceGenerate, payload)
    _, err := c.client.Enqueue(task, asynq.MaxRetry(3))
    return err
}

// 4. Write handler function
func handleInvoiceGenerate(deps WorkerDeps) func(ctx context.Context, task *asynq.Task) error {
    return func(ctx context.Context, task *asynq.Task) error {
        var payload InvoicePayload
        if err := json.Unmarshal(task.Payload(), &payload); err != nil {
            return err
        }
        // Generate PDF, send email, etc.
        return nil
    }
}

// 5. Register in worker mux (in StartWorker)
mux.HandleFunc(TypeInvoiceGenerate, handleInvoiceGenerate(deps))

Admin Jobs Dashboard

The admin panel includes a jobs dashboard that shows queue statistics and allows admins to view, retry, and clear jobs. The dashboard uses the asynq Inspector API under the hood.

EndpointMethodDescription
/api/admin/jobs/statsGETQueue stats (pending, active, completed, failed)
/api/admin/jobs/:statusGETList jobs by status (active, pending, completed, failed, retry)
/api/admin/jobs/:id/retryPOSTRetry a failed job
/api/admin/jobs/queue/:queueDELETEClear all completed tasks in a queue

Concurrency: The default worker concurrency is 10. This means up to 10 jobs can be processed simultaneously. Adjust the Concurrency setting in StartWorker() based on your server resources.