System requirements


Functional:

  • Users should be able to schedule a task
  • Schedules must be configurable.
  • Assuming we are tasked to design a distributed Task scheduler
  • Assuming a task is a binary and our systems have operators that understand these binaries
  • Task must run exactly once
  • Users must be able to delete tasks
  • Users must be able to delete tasks
  • Users must be able to list tasks



Non-Functional:

  • Since, this is a distributed system, we assume we will need AP from CAP theorem i.e. since, Partition tolerance is unavoidable, we will trade off strong consistency in-lieu of High Availability. we would though want our system to be Eventually consistent
  • A Slight Delay in scheduling and execution is expected (say a few hundred milliseconds). however, the run is guaranteed. Thus, we strive for Eventual Consistency Model
  • We would want our SLA to be less than an hour per year i.e. 99.99%. We will derive this with our SLIs from all the components in the design
  • High Scalability and fault tolerance.



Capacity estimation

  • Assuming 10 Billion tasks per day
  • 10Billion/86400 = ~120000 tasks per second
  • This implies our system is very write heavy.
  • Assuming 1 KB information per task = 10 Billion * 1K = 10 Trillion = 2^40 = 10TB Per day i.e. 3.65 PB per year i.e. 36.5 PB per 10 years



API design

POST: /api/v1/schedule

inputs:

  • user_id, required
  • task_blob, required
  • schedule, required
  • is_recurring, required

outputs:

  • task_id


GET: /api/v1/?task=task_id

inputs:

  • user_id, required
  • task_id, required

outputs:

  • task_id
  • task_status
  • last_executed
  • next_execution


LIST: /api/v1/tasks

inputs:

  • user_id

outputs:

  • task_id
  • task_status
  • last_executed
  • next_execution


DELETE: /api/v1/?task=task_id

inputs:

  • user_id, required
  • task_id, required

outputs:

  • success or error



Database design

Since our system is write heavy and our data is more than 5 TB. we will use a column based database that are generally highly scalable for high load and write heavy queries.


we will index on task_id and task_status. Assuming we will have following schema


Task Table String task_id PRIMARY KEY NOT NULL, String schedule NOT NULL, blob task NOT NULL, String task_status NOT NULL, Timestamp<tz> last_executed NOT NULL, Timestamp<tz> next_execution Task Management table String task_id PRIMARY KEY NOT NULL, String schedule NOT NULL, Blob task NOT NULL, Boolean is_recurring NOT NULL


It is important to emphasise that, we need time zone aware timestamp to execute task at correct time in correct timezone.



High-level design


  • Load Balancer: Our Service sits behind a Load Balancer. So, we can ensure robust traffic management
  • Our first component is Task manager, task manager remains our primary component that interacts with our clients and updates Database and cache
    • POST requests -> Adds task to the queue
    • GET requests -> GETS tasks status from the Cache
    • Delete requests -> sets mark_for_deletion to true in the database
  • Cache: We cache the list of tasks to serve our list requests. Our cache expiration strategy is simple enough that we only keep data for day i.e. expire after 86400 seconds
  • Queue: Message queue, we add tasks to the queue. The importance of queue here that, we can handle failures and re-process tasks.
  • Task Scheduler, adds tasks to our Message Queue
  • Execution Service, consumes from the message queue and uses leader follower pattern to execute tasks on various nodes. At Execution, we mark task_status in progress and after Execution, update the status of the task as fail or success
    • Leader is the one that manages replication and co-ordinates the tasks
    • Each follower reads from the database and schedules tasks
  • Database: Manages the state of each task and remains the single source of truth




Request flows

POST

Client -> Task manager -> Add task to Database Table ("Task management") -> Task Scheduler Picks the task and adds it to the message queue -> Execution service consumes the task, Leader co-ordinates with the worker to execute the task and then, add/update task_status in database table "Task" and writes to cache.


DELETE

Client -> Task manager -> Try to Delete a task from the Database Table ("Task management"). If tasks is already in progress meaning it is in the queue, we return error.


GET

Client -> Task manager -> Reads through the cache and returns if not a miss else gets the task from database table "Task"


LIST

Client -> Task manager -> Lists the cache, if the cache is empty we return a message to try later as otherwise we will be putting too much load on our DB which is designed for ready heavy use-cases


Detailed component design

  • Task Scheduling
    • Handling Recurring Tasks:
      • Task manager upon receive the requests sets is_recurring field in Task Management table.
      • Task Scheduler upon reading the Task Management table when notices `is_recurring` set it writes back all schedule to Task table
    • Adding tasks to the queue
      • Scheduler add tasks to the message queue by sorting the next execution in reverse chronological order
  • Execution Service
    • We use raft algorithm to implement leader follower pattern. Every node is a follower node at start, during the election process each node votes for itself, nodes wait for the consensus and once reached a lease is granted to the leader. In the absence of leader, a follower node steps up to become leader
    • Execution service sole task is to execute task and update status in task table
  • Conflict Resolution
    • In a distributed system like this our followers can try executing the task. To avoid this, the follower node acquires a lock in the table over the column and as releases when its done. Thus, ensuring atomicity
    • Timestamp: We use Timezone aware timestamp to ensure user experience and we also use NTP services on all nodes synchronised (This is a critical config as if this fails we may have unexpected task executions)
  • Task Status
    • In_Progress - Execution service has read it from the Message queue
    • Success - Execution service has processed the task and task completed successfully
    • Error - Execution service has processed the task and task failed
    • Error Handling
      • Execution service implements a retry logic with exponential backoff. Even after that failure. task failed metric is published to our monitoring system
  • Monitoring, Logging
    • task failure metrics
      • user_task_fail: This is a metric to notify users that their task has failed even after retries
      • ntp_errors: reports the clock skews if any
      • leader_ids
      • follower_counts
      • messaged_in_queue: pending processing
      • latency: round trip
      • throughput: tasks scheduled per second
    • Logging
      • Ideally we will like to have logs for all RPC calls (INFO)
      • We log errors



Trade offs/Tech choices

  • Raft over Zookeeper: We implement raft as to simplify. Zookeeper would have been robust. however, would have added extra complexity, i.e. we need to carefully manage another distributed system


  • We user Leader Follower pattern. We could use Multi Leader and Leaderless patterns. however, in both cases we will have conflicts that would rather make need an additional handling of such cases.


Failure scenarios/bottlenecks

  • NTP i.e. CLOCK SKEW could cause catastrophic failures. This is why it is important that we manage nodes with configuration management systems such as Ansible i.e. all nodes have some configs and are not drifted





Future improvements

  • Improving the List tasks actions. it could currently list tasks are processed. we may want a list of all tasks submitted
  • Adding priorities to the tasks and scheduling/handling tasks as per the priority.