# System Design: Task Scheduler


## Problem Statement


Design a task scheduler system that allows users to schedule tasks to be executed once at a specified time in the future or on a recurring basis using specified intervals. The system should efficiently manage and execute thousands of tasks with minimal delay and high reliability.


---


## Phase 1: Requirements (~5 min)


### Functional Requirements


1. **Users should be able to schedule one-time tasks** for execution at a specified future time

2. **Users should be able to schedule recurring tasks** with intervals (every N minutes, daily, weekly, cron expressions)

3. **Users should be able to manage tasks** (add, remove, pause, modify, check status)

4. **System should track task states** (pending, running, completed, failed)

5. **System should retry failed tasks** with exponential backoff (max 3-5 retries)

6. **Users should be notified of task failures** (email, push, dashboard alerts)


### Non-Functional Requirements


| Requirement | Target |

|-------------|--------|

| **Concurrency** | 10,000 concurrent tasks at peak |

| **Throughput** | 100,000 tasks/day (~70 tasks/min) |

| **Latency** | < 1 second from scheduled time to execution start |

| **Precision** | 1-second granularity, millisecond-level accuracy |

| **Availability** | High availability, horizontal scaling |

| **Reliability** | No missed tasks, exactly-once execution guarantee |


### Out of Scope


- Task dependencies (task B runs after task A completes)

- Priority queues for tasks

- Geographic distribution


---


## Phase 2: Core Entities (~2 min)


```mermaid

erDiagram

User {

uuid id PK

string email

json notification_preferences

string api_key

}

Task {

uuid id PK

uuid user_id FK

string name

enum type "ONE_TIME | RECURRING"

json payload "webhook URL, parameters"

timestamp scheduled_at "for one-time"

string cron_expression "for recurring"

timestamp end_date "optional"

int max_executions "optional"

enum status "PENDING | SCHEDULED | RUNNING | COMPLETED | FAILED | PAUSED"

int retry_count

int partition_id

timestamp created_at

timestamp updated_at

}

TaskExecution {

uuid id PK

uuid task_id FK

timestamp scheduled_time

timestamp started_at

timestamp completed_at

enum status "SUCCESS | FAILED | RETRYING"

string error_message

int attempt_number

}

TaskDLQ {

uuid id PK

uuid task_id FK

uuid user_id

string webhook_url

json payload

json attempts "all attempt errors"

string final_error

timestamp created_at

timestamp resolved_at

string resolution "retried | archived"

}

User ||--o{ Task : "creates"

Task ||--o{ TaskExecution : "has"

Task ||--o| TaskDLQ : "fails to"

```


---


## Phase 3: API Design (~5 min)


### Protocol Choice: REST


REST is appropriate here - we have clear CRUD operations on resources (tasks).


### Endpoints


```

POST /v1/tasks

Headers: Authorization: Bearer <token>, Idempotency-Key: <uuid>

Body: {

"name": "Daily Report",

"type": "RECURRING",

"payload": {

"webhook_url": "https://api.example.com/reports",

"method": "POST",

"body": { "report_type": "daily" }

},

"cron_expression": "0 8 * * *",

"end_date": "2026-12-31T00:00:00Z"

}

Response: 201 Created

{

"id": "task-uuid",

"status": "SCHEDULED",

"next_execution": "2026-02-10T08:00:00Z"

}


GET /v1/tasks

Query: ?status=SCHEDULED&page=1&limit=20

Response: { "tasks": [...], "pagination": {...} }


GET /v1/tasks/{task_id}

Response: { "id": "...", "status": "...", "executions": [...] }


PATCH /v1/tasks/{task_id}

Body: { "status": "PAUSED" } or { "cron_expression": "0 9 * * *" }

Response: 200 OK


DELETE /v1/tasks/{task_id}

Response: 204 No Content


GET /v1/tasks/{task_id}/executions

Query: ?status=FAILED&limit=10

Response: { "executions": [...] }

```


### Idempotency


All state-changing operations require `Idempotency-Key` header to prevent duplicate task creation on network retries.


---


## Phase 4: Capacity Estimation (~3 min)


### Given

- 100,000 tasks/day

- 10,000 concurrent tasks at peak

- Each task record ~1KB


### Calculations


```

Tasks per second (avg): 100,000 / 86,400 ≈ 1.2 tasks/sec

Tasks per second (peak 10x): ~12 tasks/sec


Storage for active tasks: 10,000 × 1KB = 10MB

Storage for 1 year history: 100,000 × 365 × 1KB = 36.5GB


Task executions (assuming 2 executions avg per task):

- 200,000 execution records/day

- 73M records/year × 0.5KB = 36.5GB


Total storage (1 year): ~75GB

```


### Implications

- Scale is manageable with a single database with read replicas

- Need efficient indexing on `scheduled_at` for time-based queries

- Consider partitioning execution history by date for cleanup


---


## Phase 5: High-Level Design (~10 min)


### Architecture Overview


```mermaid

flowchart TD

C["Clients - Web UI / Mobile / API"] --> AG["API Gateway - Auth, Rate Limiting, LB"]

AG --> API

subgraph TaskSchedulerService["Task Scheduler Service"]

API["Task API - HTTP"]

S1["Scheduler 1"]

S2["Scheduler N"]

SU["Status Updater - Kafka Consumer"]

tasks[(tasks)]

executions[(task_executions)]

dlq[(task_dlq)]

API --> tasks

API --> executions

S1 --> tasks

S2 --> tasks

SU --> tasks

SU --> executions

SU --> dlq

end

subgraph Redis["Redis - shared infrastructure"]

R1["scheduler:*"]

R2["queue:*"]

R3["worker:*"]

end

S1 --> Redis

S2 --> Redis

subgraph Workers["Worker Pool"]

W1["W1"]

W2["W2"]

WN["WN"]

end

Redis --> W1

Redis --> W2

Redis --> WN

W1 --> WH["External Webhooks"]

W2 --> WH

WN --> WH

W1 --> K["Kafka"]

W2 --> K

WN --> K

K --> SU

K --> NS["Notification Service"]

```


**Key Points:**

- Task API, Schedulers, and Status Updater are **components of the same service** (share PostgreSQL)

- **Workers are separate** - only interact with Redis and Kafka, no database access

- **Status Updater** consumes Kafka events and writes execution results to PostgreSQL

- **Redis coordinates partition assignment** (`scheduler:active` set with heartbeat)

- Schedulers dynamically discover each other and rebalance partitions

- Fixed 256 partitions, distributed among active schedulers

- Redis only stores near-term tasks (not all tasks = low memory)


**Key Flow:**


```mermaid

sequenceDiagram

participant Client

participant TaskAPI

participant PostgreSQL

participant Scheduler

participant Redis

participant Worker

participant Webhook

participant Kafka

participant StatusUpdater


Client->>TaskAPI: POST /v1/tasks

TaskAPI->>PostgreSQL: INSERT (status=SCHEDULED)

loop Every 1 second

Scheduler->>PostgreSQL: SELECT due tasks (my partitions)

PostgreSQL-->>Scheduler: tasks[]

Note over Scheduler,Redis: Order Matters: data first, queue second

Scheduler->>Redis: HSET queue:task_data:{id} (payload)

Scheduler->>Redis: ZADD queue:due_tasks (task_id)

Scheduler->>PostgreSQL: UPDATE status=QUEUED

end


Worker->>Redis: claim from queue:due_tasks (Lua)

Redis-->>Worker: task_id

Worker->>Redis: HSET queue:processing (visibility)

Worker->>Redis: HGETALL queue:task_data:{id}

Redis-->>Worker: payload

Worker->>Webhook: HTTP POST

Webhook-->>Worker: 200 OK

Worker->>Redis: HDEL queue:processing

Worker->>Redis: DEL queue:task_data:{id}

Worker->>Kafka: publish task.completed

Note over Kafka,StatusUpdater: Async: Status Updater in Task Scheduler Service

Kafka-->>StatusUpdater: consume event

StatusUpdater->>PostgreSQL: INSERT task_executions

StatusUpdater->>PostgreSQL: UPDATE tasks status=COMPLETED

```


### Component Responsibilities


**Task Scheduler Service** (single service, multiple components):


| Component | Responsibilities |

|-----------|------------------|

| **Task API** | CRUD operations, validates cron expressions, writes to PostgreSQL, calculates partition_id |

| **Schedulers** | Register in Redis, heartbeat, dynamic partition assignment, poll DB for due tasks, push to Redis, requeue expired tasks |

| **Status Updater** | Kafka consumer, writes execution results to task_executions table, updates task status |


**Why Status Updater is inside Task Scheduler Service:**

- Workers don't have PostgreSQL access (only Redis + Kafka)

- Task Scheduler Service owns the `tasks` and `task_executions` tables

- Same bounded context - execution results belong to task management


**Status Updater Flow:**

```go

func (su *StatusUpdater) Consume(ctx context.Context) {

for msg := range su.kafka.Messages() {

var event ExecutionEvent

json.Unmarshal(msg.Value, &event)

su.db.ExecContext(ctx, `

INSERT INTO task_executions

(task_id, status, completed_at, error_message, attempt_number)

VALUES ($1, $2, $3, $4, $5)

`, event.TaskID, event.Status, event.CompletedAt,

event.Error, event.Attempt)

su.db.ExecContext(ctx, `

UPDATE tasks SET status = $1, updated_at = NOW() WHERE id = $2

`, event.Status, event.TaskID)

}

}

```


**Workers** (separate service, no database access):

- Pull tasks from Redis when due (Lua script)

- Get task payload from Redis hash

- Execute webhooks with timeout

- Handle retries with exponential backoff

- Publish execution events to Kafka


**Notification Service:**

- Consumes execution events from Kafka

- Sends failure notifications via email/push

- Manages user notification preferences


### Redis Data Storage Strategy


**Problem:** Workers need task payload (webhook URL, parameters) but don't access PostgreSQL.


**Solution:** Scheduler pushes both queue entry and task data to Redis.


| Redis Key | Type | Owner | Purpose |

|-----------|------|-------|---------|

| `scheduler:active` | Set | Scheduler | Scheduler coordination (heartbeat) |

| `queue:due_tasks` | Sorted Set | Shared | Task timing (score = timestamp, member = task_id) |

| `queue:task_data:{id}` | Hash | Shared | Task payload (webhook_url, payload, retry_count) |

| `queue:processing` | Hash | Shared | Visibility timeout tracking |

| `worker:executed:{id}:{attempt}` | String | Worker | At-least-once delivery (SETNX after success) |


**Key Naming Convention:**

- `scheduler:*` - Used only by Scheduler (internal coordination)

- `queue:*` - Shared contract between Scheduler and Workers

- `worker:*` - Used only by Workers (internal state)


**Order Matters Pattern (Sync Mitigation):**


Write data first, queue second. This ensures a task is never queued without its data.


```go

func (s *Scheduler) pushToRedis(ctx context.Context, task Task) error {

dataKey := "queue:task_data:" + task.ID

if err := s.rdb.HSet(ctx, dataKey, map[string]interface{}{

"webhook_url": task.WebhookURL,

"payload": task.Payload,

"retry_count": task.RetryCount,

}).Err(); err != nil {

return fmt.Errorf("failed to write task data: %w", err)

}

s.rdb.Expire(ctx, dataKey, 24*time.Hour)

if err := s.rdb.ZAdd(ctx, "queue:due_tasks", &redis.Z{

Score: float64(task.ScheduledAt.Unix()),

Member: task.ID,

}).Err(); err != nil {

s.rdb.Del(ctx, dataKey)

return fmt.Errorf("failed to queue task: %w", err)

}

return nil

}

```


**Failure Modes:**


| Failure | State | Impact | Recovery |

|---------|-------|--------|----------|

| HSET fails | Nothing written | None | Retry |

| ZADD fails | Data exists, not queued | Orphaned data | TTL cleanup (24h) |

| Crash between HSET and ZADD | Data exists, not queued | Orphaned data | TTL cleanup |


**Worker Claims with Data:**


```go

func (w *Worker) claimTask(ctx context.Context) (*Task, error) {

taskID := w.claimTaskID(ctx)

if taskID == "" {

return nil, nil

}

data, err := w.rdb.HGetAll(ctx, "queue:task_data:"+taskID).Result()

if err != nil || len(data) == 0 {

log.Warn("task data missing", "task_id", taskID)

w.rdb.HDel(ctx, "queue:processing", taskID)

return nil, nil

}

return parseTask(taskID, data), nil

}

```


### Data Flow: Scheduling a One-Time Task


```mermaid

%%{init: {'flowchart': {'rankSpacing': 30}}}%%

flowchart TD

subgraph Creation["1. Task Creation"]

direction LR

C[Client] -->|POST /v1/tasks| API[Task API]

API -->|INSERT| PG[(PostgreSQL)]

end

subgraph Scheduling["2. Scheduling (every 1s)"]

direction LR

PG2[(PostgreSQL)] -->|poll due tasks| S[Scheduler]

S -->|1.HSET task_data| R[(Redis)]

S -->|2.ZADD queue:due_tasks| R

end

subgraph Execution["3. Execution"]

direction LR

R2[(Redis queue:due_tasks)] -->|claim task_id| W[Worker]

R3[(Redis task_data)] -->|HGETALL| W

W -->|HTTP POST| WH[Webhook]

end

subgraph Completion["4. Completion (async)"]

direction LR

W2[Worker] -->|DEL task_data| R4[(Redis)]

W2 -->|publish event| K[Kafka]

end

subgraph StatusUpdate["5. Status Update (Task Scheduler Service)"]

direction LR

K2[Kafka] -->|consume| SU[Status Updater]

SU -->|INSERT task_executions| PG3[(PostgreSQL)]

SU -->|UPDATE tasks| PG3

end

Creation --> Scheduling --> Execution --> Completion --> StatusUpdate

```


### Data Flow: Recurring Task


```mermaid

%%{init: {'flowchart': {'rankSpacing': 30}}}%%

flowchart TD

subgraph Worker

W[Execute webhook] --> PUB[Publish task.completed to Kafka]

end

subgraph StatusUpdater["Status Updater (Task Scheduler Service)"]

K[Consume from Kafka] --> INSERT[INSERT task_executions]

INSERT --> CRON[Parse cron_expression]

CRON --> CHECK{past end_date?}

CHECK -->|No| UPDATE[UPDATE tasks<br/>scheduled_at = next_run<br/>status = SCHEDULED]

CHECK -->|Yes| DONE[UPDATE tasks<br/>status = COMPLETED]

end

PUB --> K

```


**Note:** Worker only publishes events. Status Updater handles:

- Recording execution in `task_executions`

- Calculating next run from `cron_expression`

- Updating `scheduled_at` for the next execution


---


## Phase 6: Deep Dives (~15 min)


### 6.1 Resilience & Failure Handling


**What happens if a Scheduler crashes?**

- Scheduler's heartbeat expires (30s TTL)

- Other schedulers detect change in `scheduler:active` set

- Partitions are automatically rebalanced among remaining schedulers

- Kubernetes restarts the failed scheduler

- New instance registers and triggers another rebalance

- Idempotent operations - re-polling same tasks is safe (status check prevents duplicates)


**Dynamic Partition Assignment (Redis-based):**

```go

func (s *Scheduler) Run(ctx context.Context) {

// 1. Register in Redis with heartbeat

s.rdb.SAdd(ctx, "scheduler:active", s.podName)

go s.heartbeat(ctx) // Renew every 10s, TTL 30s

// 2. Watch for scheduler changes and rebalance

go s.watchAndRebalance(ctx)

// 3. Start polling assigned partitions

s.poll(ctx)

}


func (s *Scheduler) rebalance(schedulers []string) {

sort.Strings(schedulers)

myIndex := indexOf(schedulers, s.podName)

totalSchedulers := len(schedulers)

// Fixed 256 partitions, distributed by index

s.partitions = []int{}

for p := 0; p < 256; p++ {

if p % totalSchedulers == myIndex {

s.partitions = append(s.partitions, p)

}

}

}

```


**Rebalancing Example:**


```mermaid

%%{init: {'flowchart': {'rankSpacing': 40}}}%%

flowchart TD

subgraph Before["Before: 3 schedulers (256 partitions)"]

direction LR

S0["scheduler-0<br/>85 partitions"]

S1["scheduler-1<br/>85 partitions"]

S2["scheduler-2<br/>86 partitions"]

end


Before --> X["scheduler-0 dies<br/>heartbeat expires"]


subgraph After["After: 2 schedulers"]

direction LR

S1b["scheduler-1 index 0<br/>128 partitions"]

S2b["scheduler-2 index 1<br/>128 partitions"]

end


X --> After

```


**What happens if Worker crashes mid-execution?**

- Task remains in Redis `queue:processing` hash with timeout

- Visibility timeout: if not ACK'd in 30s, scheduler requeues the task

- At-least-once delivery, idempotency on webhook side


**Visibility Timeout Flow:**


```mermaid

sequenceDiagram

participant W as Worker

participant R as Redis

participant S as Scheduler

participant WH as Webhook


W->>R: ZRANGEBYSCORE + ZREM (claim)

R-->>W: task_id

W->>R: HSET queue:processing (timeout=30s)

W->>WH: Execute webhook

alt Success

WH-->>W: 200 OK

W->>R: HDEL queue:processing

W->>R: Publish event

else Worker crashes

Note over R: Task stays in queue:processing

loop Every 5 seconds

S->>R: HGETALL queue:processing

R-->>S: task with timeout

alt timeout_at < now

S->>R: HDEL queue:processing

S->>R: ZADD queue:due_tasks (requeue)

end

end

end

```


**Retry Strategy:**


```mermaid

%%{init: {'flowchart': {'rankSpacing': 20}}}%%

flowchart TD

A1[Attempt 1 - Immediate] --> A2[Attempt 2 - 10s]

A2 --> A3[Attempt 3 - 30s]

A3 --> A4[Attempt 4 - 90s]

A4 --> A5[Attempt 5 - 270s]

A5 --> DLQ[FAILED → DLQ → Notify]

```


**Circuit Breaker for External Webhooks:**

- Track failure rate per webhook domain

- If >50% failures in 1 minute, open circuit

- Reject new executions for that domain for 30 seconds

- Half-open: allow 1 test request, close if successful


### 6.2 Consistency & At-Least-Once Execution


**Problem:** How do we ensure webhook delivery without losing tasks?


**Solution: Redis-based At-Least-Once Delivery**


Mark execution in Redis AFTER sending webhook successfully. If crash occurs before marking, we retry (prefer duplicate over loss).


```mermaid

flowchart TD

A[Worker claims task] --> B[Generate execution_id]

B --> C{Check SETNX key}

C -->|Key exists| SKIP[Skip - already completed]

C -->|Key not exists| D[Send webhook]

D --> E{Success?}

E -->|Yes| F[SETNX to mark complete]

F --> G[Publish to Kafka]

E -->|No| H[Increment retry_count]

H --> I{retries < max?}

I -->|Yes| J[Requeue same attempt]

I -->|No| K[Send to DLQ]

```


**Implementation:**


```go

func (w *Worker) execute(ctx context.Context, task Task) error {

executionID := fmt.Sprintf("%s:%d", task.ID, task.Attempt)

exists, err := w.rdb.Exists(ctx, "worker:executed:"+executionID).Result()

if err != nil {

return err

}

if exists > 0 {

log.Info("execution already completed, skipping", "id", executionID)

return nil

}

req, _ := http.NewRequest("POST", task.WebhookURL, bytes.NewReader(task.Payload))

req.Header.Set("X-Idempotency-Key", executionID)

resp, err := w.client.Do(req)

if err != nil || resp.StatusCode >= 500 {

return fmt.Errorf("webhook failed: %v", err)

}

w.rdb.SetNX(ctx, "worker:executed:"+executionID, "1", 24*time.Hour)

w.kafka.Publish("task.completed", ...)

return nil

}

```


**How It Works:**


| Event | execution_id | Behavior |

|-------|--------------|----------|

| First attempt | task-123:1 | Key not exists, send webhook, mark complete |

| Crash after send, before mark | task-123:1 | Key not exists, retry sends duplicate |

| Webhook fails | task-123:1 | No key set, same attempt retries |

| Success after retry | task-123:1 | Key set, future retries skip |


**Trade-off:**


| Aspect | At-Least-Once (our choice) | At-Most-Once |

|--------|----------------------------|--------------|

| Duplicates | Possible (crash after send) | Prevented |

| Data loss | No | Possible |

| Redis dependency | Required | Required |

| Receiver requirement | Must be idempotent | No requirement |


**Why At-Least-Once?**

- Data loss is unacceptable (missed notifications, lost orders)

- Duplicates can be handled by idempotent receivers (using X-Idempotency-Key)

- Industry standard for critical webhook delivery


### 6.3 Scalability


**Horizontal Scaling:**


| Component | Scaling Strategy |

|-----------|------------------|

| Task Scheduler Service (API + Schedulers + Status Updater) | Scale together as one deployment, Redis coordinates partition rebalancing |

| Workers | Scale independently, add more workers, Redis handles distribution |

| PostgreSQL | Primary for writes, read replicas optional |

| Redis | Cluster mode with partitioned queues if needed |


**Trade-off: Coupled Scaling**


Task API, Schedulers, and Status Updater scale together because they're in the same service:


| Pros | Cons |

|------|------|

| Simpler deployment (one artifact) | Can't scale API independently from Schedulers |

| Shared PostgreSQL connection pool | If API needs 10 replicas but Scheduler needs 3, you get 10 of both |

| Same bounded context | Resource waste if workloads differ significantly |


**Mitigation:** For most workloads, this is acceptable. If API load vastly exceeds scheduling load, consider splitting into separate services later.


**Dynamic Scheduler Scaling:**


```mermaid

%%{init: {'flowchart': {'rankSpacing': 30}}}%%

flowchart LR

subgraph ScaleUp["Scale Up: --replicas=5"]

direction TB

B3["3 schedulers"] --> A5["5 schedulers"]

A5 --> R1["256/5 = 51 each"]

end


ScaleUp ~~~ ScaleDown


subgraph ScaleDown["Scale Down: --replicas=2"]

direction TB

B5["Stop heartbeat"] --> TTL["TTL expires"]

TTL --> R2["256/2 = 128 each"]

end

```


**Scheduler Partition Query:**

```go

func (s *Scheduler) poll(ctx context.Context) {

for _, partition := range s.partitions {

rows, _ := s.db.QueryContext(ctx, `

SELECT id, scheduled_at, payload

FROM tasks

WHERE status = 'SCHEDULED'

AND scheduled_at < $1

AND partition_id = $2

LIMIT 100

`, time.Now().Add(60*time.Second), partition)

// Push to Redis...

}

}

```


**Notes:**

- Fixed 256 partitions (never changes, allows scaling to 256 schedulers)

- `partition_id` is pre-calculated on task creation: `hash(user_id) % 256`

- Each scheduler handles `partitions where p % activeSchedulers == myIndex`


**Benefits:**

- No duplicate scheduling (each partition owned by exactly one scheduler)

- Fully dynamic scaling (no manual configuration)

- Automatic rebalancing on failure or scaling

- Linear throughput scaling (more schedulers = more capacity)


**Handling Thundering Herd (many tasks at same time):**


```mermaid

%%{init: {'flowchart': {'rankSpacing': 30}}}%%

flowchart TD

subgraph Problem["Problem"]

direction LR

P["1000 tasks at 12:00:00.000"]

end


Problem --> J["Apply Jitter<br/>scheduled_at + random(0-5000ms)"]


subgraph Solution["Result: Spread over 5s"]

direction LR

T1["12:00:00.123"]

T2["12:00:02.456"]

T3["12:00:04.789"]

end


J --> Solution

```


### 6.4 Observability


**Key Metrics:**


| Metric | Description | Alert Threshold |

|--------|-------------|-----------------|

| tasks_scheduled_total | Tasks created | - |

| tasks_executed_total | Successful executions | - |

| tasks_failed_total | Failed executions | >1% failure rate |

| execution_delay_seconds | scheduled_at vs actual start | >5s p99 |

| queue_depth | Tasks waiting in Redis | >1000 |

| worker_utilization | % workers busy | >80% |


**SLOs:**

- 99.9% of tasks execute within 5 seconds of scheduled time

- 99.99% task execution success rate (excluding external failures)

- 99.9% API availability


**Distributed Tracing:**

- Trace ID propagated from API → Scheduler → Worker → Webhook

- Allows debugging end-to-end execution path


**Dead Letter Queue (DLQ):**


Tasks that fail all retries are moved to the DLQ for manual inspection and resolution.


```mermaid

%%{init: {'flowchart': {'rankSpacing': 25}}}%%

flowchart LR

W[Worker - all retries failed] -->|publish| K[Kafka task.failed]

K --> SU[Status Updater]

SU --> T[(UPDATE tasks<br/>status=FAILED)]

SU --> DLQ[(INSERT task_dlq)]

SU --> ALERT[Kafka task.dlq]

ALERT --> NS[Notification Service]

```


**DLQ Table:**

```sql

CREATE TABLE task_dlq (

id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

task_id UUID REFERENCES tasks(id),

user_id UUID,

webhook_url TEXT,

payload JSONB,

attempts JSONB,

final_error TEXT,

created_at TIMESTAMP DEFAULT NOW(),

resolved_at TIMESTAMP,

resolution TEXT

);

```


**DLQ Operations:**


| Operation | How |

|-----------|-----|

| **View failed** | `SELECT * FROM task_dlq WHERE resolved_at IS NULL` |

| **Retry** | Re-insert into tasks with status=SCHEDULED, mark DLQ resolved |

| **Archive** | Set resolution='archived', resolved_at=NOW() |


**Retry from DLQ:**

```go

func (api *TaskAPI) RetryFromDLQ(ctx context.Context, dlqID string) error {

tx, _ := api.db.BeginTx(ctx, nil)

tx.ExecContext(ctx, `

UPDATE tasks

SET status = 'SCHEDULED', retry_count = 0, scheduled_at = NOW()

WHERE id = (SELECT task_id FROM task_dlq WHERE id = $1)

`, dlqID)

tx.ExecContext(ctx, `

UPDATE task_dlq SET resolved_at = NOW(), resolution = 'retried'

WHERE id = $1

`, dlqID)

tx.Commit()

return nil

}


---


## Trade-off Decisions


| Decision | Alternatives | Trade-off | Justification |

|----------|--------------|-----------|---------------|

| Single service (API + Scheduler + Status Updater) | Separate microservices | Coupled scaling vs simplicity | Same bounded context, shared DB, one deployment; accept scaling together |

| Redis for due queue | Database polling only | Memory cost vs latency | Redis provides O(log N) time-based retrieval, sub-ms latency |

| Redis only stores 60s window | Store all tasks in Redis | Memory vs completeness | Keeps Redis memory low, DB is source of truth |

| Sorted Set + Hash (Order Matters) | Full JSON in set, Lua script, API call | Extra Redis call vs sync safety | Prevents queued-without-data state, TTL handles orphans, simpler than Lua |

| At-least-once delivery | At-most-once | Duplicates vs data loss | Data loss unacceptable; receivers must be idempotent (X-Idempotency-Key) |

| PostgreSQL for tasks | NoSQL (DynamoDB) | Flexibility vs scale | 75GB/year is small, need complex queries on tasks |

| Partitioned schedulers | Single leader | Complexity vs throughput | Scales linearly, no single point of bottleneck |

| Redis for coordination | ZooKeeper | Simplicity vs robustness | Already have Redis, avoids extra infrastructure |

| Fixed 256 partitions | Dynamic partition count | Simplicity vs flexibility | Over-partition upfront avoids migration, scales to 256 schedulers |

| Kafka for events | Direct DB writes from workers | Complexity vs decoupling | Enables async notifications, workers stay lightweight |


---


## Potential Improvements (if time permits)


1. **Priority queues** - Use multiple Redis sorted sets for different priorities

2. **Task dependencies** - DAG execution engine

3. **Geographic distribution** - Cell-based architecture for multi-region

4. **Batch execution** - Group similar tasks for efficiency

5. **Rate limiting per user** - Prevent single user from overwhelming system