# System Design: Task Scheduler
## Problem Statement
Design a task scheduler system that allows users to schedule tasks to be executed once at a specified time in the future or on a recurring basis using specified intervals. The system should efficiently manage and execute thousands of tasks with minimal delay and high reliability.
---
## Phase 1: Requirements (~5 min)
### Functional Requirements
1. **Users should be able to schedule one-time tasks** for execution at a specified future time
2. **Users should be able to schedule recurring tasks** with intervals (every N minutes, daily, weekly, cron expressions)
3. **Users should be able to manage tasks** (add, remove, pause, modify, check status)
4. **System should track task states** (pending, running, completed, failed)
5. **System should retry failed tasks** with exponential backoff (max 3-5 retries)
6. **Users should be notified of task failures** (email, push, dashboard alerts)
### Non-Functional Requirements
| Requirement | Target |
|-------------|--------|
| **Concurrency** | 10,000 concurrent tasks at peak |
| **Throughput** | 100,000 tasks/day (~70 tasks/min) |
| **Latency** | < 1 second from scheduled time to execution start |
| **Precision** | 1-second granularity, millisecond-level accuracy |
| **Availability** | High availability, horizontal scaling |
| **Reliability** | No missed tasks, exactly-once execution guarantee |
### Out of Scope
- Task dependencies (task B runs after task A completes)
- Priority queues for tasks
- Geographic distribution
---
## Phase 2: Core Entities (~2 min)
```mermaid
erDiagram
User {
uuid id PK
string email
json notification_preferences
string api_key
}
Task {
uuid id PK
uuid user_id FK
string name
enum type "ONE_TIME | RECURRING"
json payload "webhook URL, parameters"
timestamp scheduled_at "for one-time"
string cron_expression "for recurring"
timestamp end_date "optional"
int max_executions "optional"
enum status "PENDING | SCHEDULED | RUNNING | COMPLETED | FAILED | PAUSED"
int retry_count
int partition_id
timestamp created_at
timestamp updated_at
}
TaskExecution {
uuid id PK
uuid task_id FK
timestamp scheduled_time
timestamp started_at
timestamp completed_at
enum status "SUCCESS | FAILED | RETRYING"
string error_message
int attempt_number
}
TaskDLQ {
uuid id PK
uuid task_id FK
uuid user_id
string webhook_url
json payload
json attempts "all attempt errors"
string final_error
timestamp created_at
timestamp resolved_at
string resolution "retried | archived"
}
User ||--o{ Task : "creates"
Task ||--o{ TaskExecution : "has"
Task ||--o| TaskDLQ : "fails to"
```
---
## Phase 3: API Design (~5 min)
### Protocol Choice: REST
REST is appropriate here - we have clear CRUD operations on resources (tasks).
### Endpoints
```
POST /v1/tasks
Headers: Authorization: Bearer <token>, Idempotency-Key: <uuid>
Body: {
"name": "Daily Report",
"type": "RECURRING",
"payload": {
"webhook_url": "https://api.example.com/reports",
"method": "POST",
"body": { "report_type": "daily" }
},
"cron_expression": "0 8 * * *",
"end_date": "2026-12-31T00:00:00Z"
}
Response: 201 Created
{
"id": "task-uuid",
"status": "SCHEDULED",
"next_execution": "2026-02-10T08:00:00Z"
}
GET /v1/tasks
Query: ?status=SCHEDULED&page=1&limit=20
Response: { "tasks": [...], "pagination": {...} }
GET /v1/tasks/{task_id}
Response: { "id": "...", "status": "...", "executions": [...] }
PATCH /v1/tasks/{task_id}
Body: { "status": "PAUSED" } or { "cron_expression": "0 9 * * *" }
Response: 200 OK
DELETE /v1/tasks/{task_id}
Response: 204 No Content
GET /v1/tasks/{task_id}/executions
Query: ?status=FAILED&limit=10
Response: { "executions": [...] }
```
### Idempotency
All state-changing operations require `Idempotency-Key` header to prevent duplicate task creation on network retries.
---
## Phase 4: Capacity Estimation (~3 min)
### Given
- 100,000 tasks/day
- 10,000 concurrent tasks at peak
- Each task record ~1KB
### Calculations
```
Tasks per second (avg): 100,000 / 86,400 ≈ 1.2 tasks/sec
Tasks per second (peak 10x): ~12 tasks/sec
Storage for active tasks: 10,000 × 1KB = 10MB
Storage for 1 year history: 100,000 × 365 × 1KB = 36.5GB
Task executions (assuming 2 executions avg per task):
- 200,000 execution records/day
- 73M records/year × 0.5KB = 36.5GB
Total storage (1 year): ~75GB
```
### Implications
- Scale is manageable with a single database with read replicas
- Need efficient indexing on `scheduled_at` for time-based queries
- Consider partitioning execution history by date for cleanup
---
## Phase 5: High-Level Design (~10 min)
### Architecture Overview
```mermaid
flowchart TD
C["Clients - Web UI / Mobile / API"] --> AG["API Gateway - Auth, Rate Limiting, LB"]
AG --> API
subgraph TaskSchedulerService["Task Scheduler Service"]
API["Task API - HTTP"]
S1["Scheduler 1"]
S2["Scheduler N"]
SU["Status Updater - Kafka Consumer"]
tasks[(tasks)]
executions[(task_executions)]
dlq[(task_dlq)]
API --> tasks
API --> executions
S1 --> tasks
S2 --> tasks
SU --> tasks
SU --> executions
SU --> dlq
end
subgraph Redis["Redis - shared infrastructure"]
R1["scheduler:*"]
R2["queue:*"]
R3["worker:*"]
end
S1 --> Redis
S2 --> Redis
subgraph Workers["Worker Pool"]
W1["W1"]
W2["W2"]
WN["WN"]
end
Redis --> W1
Redis --> W2
Redis --> WN
W1 --> WH["External Webhooks"]
W2 --> WH
WN --> WH
W1 --> K["Kafka"]
W2 --> K
WN --> K
K --> SU
K --> NS["Notification Service"]
```
**Key Points:**
- Task API, Schedulers, and Status Updater are **components of the same service** (share PostgreSQL)
- **Workers are separate** - only interact with Redis and Kafka, no database access
- **Status Updater** consumes Kafka events and writes execution results to PostgreSQL
- **Redis coordinates partition assignment** (`scheduler:active` set with heartbeat)
- Schedulers dynamically discover each other and rebalance partitions
- Fixed 256 partitions, distributed among active schedulers
- Redis only stores near-term tasks (not all tasks = low memory)
**Key Flow:**
```mermaid
sequenceDiagram
participant Client
participant TaskAPI
participant PostgreSQL
participant Scheduler
participant Redis
participant Worker
participant Webhook
participant Kafka
participant StatusUpdater
Client->>TaskAPI: POST /v1/tasks
TaskAPI->>PostgreSQL: INSERT (status=SCHEDULED)
loop Every 1 second
Scheduler->>PostgreSQL: SELECT due tasks (my partitions)
PostgreSQL-->>Scheduler: tasks[]
Note over Scheduler,Redis: Order Matters: data first, queue second
Scheduler->>Redis: HSET queue:task_data:{id} (payload)
Scheduler->>Redis: ZADD queue:due_tasks (task_id)
Scheduler->>PostgreSQL: UPDATE status=QUEUED
end
Worker->>Redis: claim from queue:due_tasks (Lua)
Redis-->>Worker: task_id
Worker->>Redis: HSET queue:processing (visibility)
Worker->>Redis: HGETALL queue:task_data:{id}
Redis-->>Worker: payload
Worker->>Webhook: HTTP POST
Webhook-->>Worker: 200 OK
Worker->>Redis: HDEL queue:processing
Worker->>Redis: DEL queue:task_data:{id}
Worker->>Kafka: publish task.completed
Note over Kafka,StatusUpdater: Async: Status Updater in Task Scheduler Service
Kafka-->>StatusUpdater: consume event
StatusUpdater->>PostgreSQL: INSERT task_executions
StatusUpdater->>PostgreSQL: UPDATE tasks status=COMPLETED
```
### Component Responsibilities
**Task Scheduler Service** (single service, multiple components):
| Component | Responsibilities |
|-----------|------------------|
| **Task API** | CRUD operations, validates cron expressions, writes to PostgreSQL, calculates partition_id |
| **Schedulers** | Register in Redis, heartbeat, dynamic partition assignment, poll DB for due tasks, push to Redis, requeue expired tasks |
| **Status Updater** | Kafka consumer, writes execution results to task_executions table, updates task status |
**Why Status Updater is inside Task Scheduler Service:**
- Workers don't have PostgreSQL access (only Redis + Kafka)
- Task Scheduler Service owns the `tasks` and `task_executions` tables
- Same bounded context - execution results belong to task management
**Status Updater Flow:**
```go
func (su *StatusUpdater) Consume(ctx context.Context) {
for msg := range su.kafka.Messages() {
var event ExecutionEvent
json.Unmarshal(msg.Value, &event)
su.db.ExecContext(ctx, `
INSERT INTO task_executions
(task_id, status, completed_at, error_message, attempt_number)
VALUES ($1, $2, $3, $4, $5)
`, event.TaskID, event.Status, event.CompletedAt,
event.Error, event.Attempt)
su.db.ExecContext(ctx, `
UPDATE tasks SET status = $1, updated_at = NOW() WHERE id = $2
`, event.Status, event.TaskID)
}
}
```
**Workers** (separate service, no database access):
- Pull tasks from Redis when due (Lua script)
- Get task payload from Redis hash
- Execute webhooks with timeout
- Handle retries with exponential backoff
- Publish execution events to Kafka
**Notification Service:**
- Consumes execution events from Kafka
- Sends failure notifications via email/push
- Manages user notification preferences
### Redis Data Storage Strategy
**Problem:** Workers need task payload (webhook URL, parameters) but don't access PostgreSQL.
**Solution:** Scheduler pushes both queue entry and task data to Redis.
| Redis Key | Type | Owner | Purpose |
|-----------|------|-------|---------|
| `scheduler:active` | Set | Scheduler | Scheduler coordination (heartbeat) |
| `queue:due_tasks` | Sorted Set | Shared | Task timing (score = timestamp, member = task_id) |
| `queue:task_data:{id}` | Hash | Shared | Task payload (webhook_url, payload, retry_count) |
| `queue:processing` | Hash | Shared | Visibility timeout tracking |
| `worker:executed:{id}:{attempt}` | String | Worker | At-least-once delivery (SETNX after success) |
**Key Naming Convention:**
- `scheduler:*` - Used only by Scheduler (internal coordination)
- `queue:*` - Shared contract between Scheduler and Workers
- `worker:*` - Used only by Workers (internal state)
**Order Matters Pattern (Sync Mitigation):**
Write data first, queue second. This ensures a task is never queued without its data.
```go
func (s *Scheduler) pushToRedis(ctx context.Context, task Task) error {
dataKey := "queue:task_data:" + task.ID
if err := s.rdb.HSet(ctx, dataKey, map[string]interface{}{
"webhook_url": task.WebhookURL,
"payload": task.Payload,
"retry_count": task.RetryCount,
}).Err(); err != nil {
return fmt.Errorf("failed to write task data: %w", err)
}
s.rdb.Expire(ctx, dataKey, 24*time.Hour)
if err := s.rdb.ZAdd(ctx, "queue:due_tasks", &redis.Z{
Score: float64(task.ScheduledAt.Unix()),
Member: task.ID,
}).Err(); err != nil {
s.rdb.Del(ctx, dataKey)
return fmt.Errorf("failed to queue task: %w", err)
}
return nil
}
```
**Failure Modes:**
| Failure | State | Impact | Recovery |
|---------|-------|--------|----------|
| HSET fails | Nothing written | None | Retry |
| ZADD fails | Data exists, not queued | Orphaned data | TTL cleanup (24h) |
| Crash between HSET and ZADD | Data exists, not queued | Orphaned data | TTL cleanup |
**Worker Claims with Data:**
```go
func (w *Worker) claimTask(ctx context.Context) (*Task, error) {
taskID := w.claimTaskID(ctx)
if taskID == "" {
return nil, nil
}
data, err := w.rdb.HGetAll(ctx, "queue:task_data:"+taskID).Result()
if err != nil || len(data) == 0 {
log.Warn("task data missing", "task_id", taskID)
w.rdb.HDel(ctx, "queue:processing", taskID)
return nil, nil
}
return parseTask(taskID, data), nil
}
```
### Data Flow: Scheduling a One-Time Task
```mermaid
%%{init: {'flowchart': {'rankSpacing': 30}}}%%
flowchart TD
subgraph Creation["1. Task Creation"]
direction LR
C[Client] -->|POST /v1/tasks| API[Task API]
API -->|INSERT| PG[(PostgreSQL)]
end
subgraph Scheduling["2. Scheduling (every 1s)"]
direction LR
PG2[(PostgreSQL)] -->|poll due tasks| S[Scheduler]
S -->|1.HSET task_data| R[(Redis)]
S -->|2.ZADD queue:due_tasks| R
end
subgraph Execution["3. Execution"]
direction LR
R2[(Redis queue:due_tasks)] -->|claim task_id| W[Worker]
R3[(Redis task_data)] -->|HGETALL| W
W -->|HTTP POST| WH[Webhook]
end
subgraph Completion["4. Completion (async)"]
direction LR
W2[Worker] -->|DEL task_data| R4[(Redis)]
W2 -->|publish event| K[Kafka]
end
subgraph StatusUpdate["5. Status Update (Task Scheduler Service)"]
direction LR
K2[Kafka] -->|consume| SU[Status Updater]
SU -->|INSERT task_executions| PG3[(PostgreSQL)]
SU -->|UPDATE tasks| PG3
end
Creation --> Scheduling --> Execution --> Completion --> StatusUpdate
```
### Data Flow: Recurring Task
```mermaid
%%{init: {'flowchart': {'rankSpacing': 30}}}%%
flowchart TD
subgraph Worker
W[Execute webhook] --> PUB[Publish task.completed to Kafka]
end
subgraph StatusUpdater["Status Updater (Task Scheduler Service)"]
K[Consume from Kafka] --> INSERT[INSERT task_executions]
INSERT --> CRON[Parse cron_expression]
CRON --> CHECK{past end_date?}
CHECK -->|No| UPDATE[UPDATE tasks<br/>scheduled_at = next_run<br/>status = SCHEDULED]
CHECK -->|Yes| DONE[UPDATE tasks<br/>status = COMPLETED]
end
PUB --> K
```
**Note:** Worker only publishes events. Status Updater handles:
- Recording execution in `task_executions`
- Calculating next run from `cron_expression`
- Updating `scheduled_at` for the next execution
---
## Phase 6: Deep Dives (~15 min)
### 6.1 Resilience & Failure Handling
**What happens if a Scheduler crashes?**
- Scheduler's heartbeat expires (30s TTL)
- Other schedulers detect change in `scheduler:active` set
- Partitions are automatically rebalanced among remaining schedulers
- Kubernetes restarts the failed scheduler
- New instance registers and triggers another rebalance
- Idempotent operations - re-polling same tasks is safe (status check prevents duplicates)
**Dynamic Partition Assignment (Redis-based):**
```go
func (s *Scheduler) Run(ctx context.Context) {
// 1. Register in Redis with heartbeat
s.rdb.SAdd(ctx, "scheduler:active", s.podName)
go s.heartbeat(ctx) // Renew every 10s, TTL 30s
// 2. Watch for scheduler changes and rebalance
go s.watchAndRebalance(ctx)
// 3. Start polling assigned partitions
s.poll(ctx)
}
func (s *Scheduler) rebalance(schedulers []string) {
sort.Strings(schedulers)
myIndex := indexOf(schedulers, s.podName)
totalSchedulers := len(schedulers)
// Fixed 256 partitions, distributed by index
s.partitions = []int{}
for p := 0; p < 256; p++ {
if p % totalSchedulers == myIndex {
s.partitions = append(s.partitions, p)
}
}
}
```
**Rebalancing Example:**
```mermaid
%%{init: {'flowchart': {'rankSpacing': 40}}}%%
flowchart TD
subgraph Before["Before: 3 schedulers (256 partitions)"]
direction LR
S0["scheduler-0<br/>85 partitions"]
S1["scheduler-1<br/>85 partitions"]
S2["scheduler-2<br/>86 partitions"]
end
Before --> X["scheduler-0 dies<br/>heartbeat expires"]
subgraph After["After: 2 schedulers"]
direction LR
S1b["scheduler-1 index 0<br/>128 partitions"]
S2b["scheduler-2 index 1<br/>128 partitions"]
end
X --> After
```
**What happens if Worker crashes mid-execution?**
- Task remains in Redis `queue:processing` hash with timeout
- Visibility timeout: if not ACK'd in 30s, scheduler requeues the task
- At-least-once delivery, idempotency on webhook side
**Visibility Timeout Flow:**
```mermaid
sequenceDiagram
participant W as Worker
participant R as Redis
participant S as Scheduler
participant WH as Webhook
W->>R: ZRANGEBYSCORE + ZREM (claim)
R-->>W: task_id
W->>R: HSET queue:processing (timeout=30s)
W->>WH: Execute webhook
alt Success
WH-->>W: 200 OK
W->>R: HDEL queue:processing
W->>R: Publish event
else Worker crashes
Note over R: Task stays in queue:processing
loop Every 5 seconds
S->>R: HGETALL queue:processing
R-->>S: task with timeout
alt timeout_at < now
S->>R: HDEL queue:processing
S->>R: ZADD queue:due_tasks (requeue)
end
end
end
```
**Retry Strategy:**
```mermaid
%%{init: {'flowchart': {'rankSpacing': 20}}}%%
flowchart TD
A1[Attempt 1 - Immediate] --> A2[Attempt 2 - 10s]
A2 --> A3[Attempt 3 - 30s]
A3 --> A4[Attempt 4 - 90s]
A4 --> A5[Attempt 5 - 270s]
A5 --> DLQ[FAILED → DLQ → Notify]
```
**Circuit Breaker for External Webhooks:**
- Track failure rate per webhook domain
- If >50% failures in 1 minute, open circuit
- Reject new executions for that domain for 30 seconds
- Half-open: allow 1 test request, close if successful
### 6.2 Consistency & At-Least-Once Execution
**Problem:** How do we ensure webhook delivery without losing tasks?
**Solution: Redis-based At-Least-Once Delivery**
Mark execution in Redis AFTER sending webhook successfully. If crash occurs before marking, we retry (prefer duplicate over loss).
```mermaid
flowchart TD
A[Worker claims task] --> B[Generate execution_id]
B --> C{Check SETNX key}
C -->|Key exists| SKIP[Skip - already completed]
C -->|Key not exists| D[Send webhook]
D --> E{Success?}
E -->|Yes| F[SETNX to mark complete]
F --> G[Publish to Kafka]
E -->|No| H[Increment retry_count]
H --> I{retries < max?}
I -->|Yes| J[Requeue same attempt]
I -->|No| K[Send to DLQ]
```
**Implementation:**
```go
func (w *Worker) execute(ctx context.Context, task Task) error {
executionID := fmt.Sprintf("%s:%d", task.ID, task.Attempt)
exists, err := w.rdb.Exists(ctx, "worker:executed:"+executionID).Result()
if err != nil {
return err
}
if exists > 0 {
log.Info("execution already completed, skipping", "id", executionID)
return nil
}
req, _ := http.NewRequest("POST", task.WebhookURL, bytes.NewReader(task.Payload))
req.Header.Set("X-Idempotency-Key", executionID)
resp, err := w.client.Do(req)
if err != nil || resp.StatusCode >= 500 {
return fmt.Errorf("webhook failed: %v", err)
}
w.rdb.SetNX(ctx, "worker:executed:"+executionID, "1", 24*time.Hour)
w.kafka.Publish("task.completed", ...)
return nil
}
```
**How It Works:**
| Event | execution_id | Behavior |
|-------|--------------|----------|
| First attempt | task-123:1 | Key not exists, send webhook, mark complete |
| Crash after send, before mark | task-123:1 | Key not exists, retry sends duplicate |
| Webhook fails | task-123:1 | No key set, same attempt retries |
| Success after retry | task-123:1 | Key set, future retries skip |
**Trade-off:**
| Aspect | At-Least-Once (our choice) | At-Most-Once |
|--------|----------------------------|--------------|
| Duplicates | Possible (crash after send) | Prevented |
| Data loss | No | Possible |
| Redis dependency | Required | Required |
| Receiver requirement | Must be idempotent | No requirement |
**Why At-Least-Once?**
- Data loss is unacceptable (missed notifications, lost orders)
- Duplicates can be handled by idempotent receivers (using X-Idempotency-Key)
- Industry standard for critical webhook delivery
### 6.3 Scalability
**Horizontal Scaling:**
| Component | Scaling Strategy |
|-----------|------------------|
| Task Scheduler Service (API + Schedulers + Status Updater) | Scale together as one deployment, Redis coordinates partition rebalancing |
| Workers | Scale independently, add more workers, Redis handles distribution |
| PostgreSQL | Primary for writes, read replicas optional |
| Redis | Cluster mode with partitioned queues if needed |
**Trade-off: Coupled Scaling**
Task API, Schedulers, and Status Updater scale together because they're in the same service:
| Pros | Cons |
|------|------|
| Simpler deployment (one artifact) | Can't scale API independently from Schedulers |
| Shared PostgreSQL connection pool | If API needs 10 replicas but Scheduler needs 3, you get 10 of both |
| Same bounded context | Resource waste if workloads differ significantly |
**Mitigation:** For most workloads, this is acceptable. If API load vastly exceeds scheduling load, consider splitting into separate services later.
**Dynamic Scheduler Scaling:**
```mermaid
%%{init: {'flowchart': {'rankSpacing': 30}}}%%
flowchart LR
subgraph ScaleUp["Scale Up: --replicas=5"]
direction TB
B3["3 schedulers"] --> A5["5 schedulers"]
A5 --> R1["256/5 = 51 each"]
end
ScaleUp ~~~ ScaleDown
subgraph ScaleDown["Scale Down: --replicas=2"]
direction TB
B5["Stop heartbeat"] --> TTL["TTL expires"]
TTL --> R2["256/2 = 128 each"]
end
```
**Scheduler Partition Query:**
```go
func (s *Scheduler) poll(ctx context.Context) {
for _, partition := range s.partitions {
rows, _ := s.db.QueryContext(ctx, `
SELECT id, scheduled_at, payload
FROM tasks
WHERE status = 'SCHEDULED'
AND scheduled_at < $1
AND partition_id = $2
LIMIT 100
`, time.Now().Add(60*time.Second), partition)
// Push to Redis...
}
}
```
**Notes:**
- Fixed 256 partitions (never changes, allows scaling to 256 schedulers)
- `partition_id` is pre-calculated on task creation: `hash(user_id) % 256`
- Each scheduler handles `partitions where p % activeSchedulers == myIndex`
**Benefits:**
- No duplicate scheduling (each partition owned by exactly one scheduler)
- Fully dynamic scaling (no manual configuration)
- Automatic rebalancing on failure or scaling
- Linear throughput scaling (more schedulers = more capacity)
**Handling Thundering Herd (many tasks at same time):**
```mermaid
%%{init: {'flowchart': {'rankSpacing': 30}}}%%
flowchart TD
subgraph Problem["Problem"]
direction LR
P["1000 tasks at 12:00:00.000"]
end
Problem --> J["Apply Jitter<br/>scheduled_at + random(0-5000ms)"]
subgraph Solution["Result: Spread over 5s"]
direction LR
T1["12:00:00.123"]
T2["12:00:02.456"]
T3["12:00:04.789"]
end
J --> Solution
```
### 6.4 Observability
**Key Metrics:**
| Metric | Description | Alert Threshold |
|--------|-------------|-----------------|
| tasks_scheduled_total | Tasks created | - |
| tasks_executed_total | Successful executions | - |
| tasks_failed_total | Failed executions | >1% failure rate |
| execution_delay_seconds | scheduled_at vs actual start | >5s p99 |
| queue_depth | Tasks waiting in Redis | >1000 |
| worker_utilization | % workers busy | >80% |
**SLOs:**
- 99.9% of tasks execute within 5 seconds of scheduled time
- 99.99% task execution success rate (excluding external failures)
- 99.9% API availability
**Distributed Tracing:**
- Trace ID propagated from API → Scheduler → Worker → Webhook
- Allows debugging end-to-end execution path
**Dead Letter Queue (DLQ):**
Tasks that fail all retries are moved to the DLQ for manual inspection and resolution.
```mermaid
%%{init: {'flowchart': {'rankSpacing': 25}}}%%
flowchart LR
W[Worker - all retries failed] -->|publish| K[Kafka task.failed]
K --> SU[Status Updater]
SU --> T[(UPDATE tasks<br/>status=FAILED)]
SU --> DLQ[(INSERT task_dlq)]
SU --> ALERT[Kafka task.dlq]
ALERT --> NS[Notification Service]
```
**DLQ Table:**
```sql
CREATE TABLE task_dlq (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
task_id UUID REFERENCES tasks(id),
user_id UUID,
webhook_url TEXT,
payload JSONB,
attempts JSONB,
final_error TEXT,
created_at TIMESTAMP DEFAULT NOW(),
resolved_at TIMESTAMP,
resolution TEXT
);
```
**DLQ Operations:**
| Operation | How |
|-----------|-----|
| **View failed** | `SELECT * FROM task_dlq WHERE resolved_at IS NULL` |
| **Retry** | Re-insert into tasks with status=SCHEDULED, mark DLQ resolved |
| **Archive** | Set resolution='archived', resolved_at=NOW() |
**Retry from DLQ:**
```go
func (api *TaskAPI) RetryFromDLQ(ctx context.Context, dlqID string) error {
tx, _ := api.db.BeginTx(ctx, nil)
tx.ExecContext(ctx, `
UPDATE tasks
SET status = 'SCHEDULED', retry_count = 0, scheduled_at = NOW()
WHERE id = (SELECT task_id FROM task_dlq WHERE id = $1)
`, dlqID)
tx.ExecContext(ctx, `
UPDATE task_dlq SET resolved_at = NOW(), resolution = 'retried'
WHERE id = $1
`, dlqID)
tx.Commit()
return nil
}
---
## Trade-off Decisions
| Decision | Alternatives | Trade-off | Justification |
|----------|--------------|-----------|---------------|
| Single service (API + Scheduler + Status Updater) | Separate microservices | Coupled scaling vs simplicity | Same bounded context, shared DB, one deployment; accept scaling together |
| Redis for due queue | Database polling only | Memory cost vs latency | Redis provides O(log N) time-based retrieval, sub-ms latency |
| Redis only stores 60s window | Store all tasks in Redis | Memory vs completeness | Keeps Redis memory low, DB is source of truth |
| Sorted Set + Hash (Order Matters) | Full JSON in set, Lua script, API call | Extra Redis call vs sync safety | Prevents queued-without-data state, TTL handles orphans, simpler than Lua |
| At-least-once delivery | At-most-once | Duplicates vs data loss | Data loss unacceptable; receivers must be idempotent (X-Idempotency-Key) |
| PostgreSQL for tasks | NoSQL (DynamoDB) | Flexibility vs scale | 75GB/year is small, need complex queries on tasks |
| Partitioned schedulers | Single leader | Complexity vs throughput | Scales linearly, no single point of bottleneck |
| Redis for coordination | ZooKeeper | Simplicity vs robustness | Already have Redis, avoids extra infrastructure |
| Fixed 256 partitions | Dynamic partition count | Simplicity vs flexibility | Over-partition upfront avoids migration, scales to 256 schedulers |
| Kafka for events | Direct DB writes from workers | Complexity vs decoupling | Enables async notifications, workers stay lightweight |
---
## Potential Improvements (if time permits)
1. **Priority queues** - Use multiple Redis sorted sets for different priorities
2. **Task dependencies** - DAG execution engine
3. **Geographic distribution** - Cell-based architecture for multi-region
4. **Batch execution** - Group similar tasks for efficiency
5. **Rate limiting per user** - Prevent single user from overwhelming system