System requirements
Functional:
- User can specific tasks to be scheduled. The tasks can be a piece of python code, or a custom container.
- User tasks are either scheduled to run immediately (or asap), or on a recurring interval.
- User can monitor the task, see task history, output, and optionally get notified when tasks complete.
- Out of scope
- Support different machine types, GPU, etc.
- Support DAGs.
Non-Functional:
- The system is highly available (99.99%)
- Tasks are scheduled at least once
- Minimum scheduling latency when system is not at full capacity. Tasks are started within 1 minute of their scheduled time (immediately or at a schedule)
- System is highly scalable, supports up to 100k concurrent tasks.
- Tasks are isolated from each other in terms of execution environment, security, etc. Task code should not be able to breach the security boundary assigned to it.
Capacity estimation
- Assume tasks are 10 minutes on average, 100k concurrent tasks = 100k / 10min = 200 tps
- We need to store metadata for the tasks, assume 1k per task = 200 tps * 1k * 86400 = 20 Gb/day. In one year it is 20Gb * 365 = 70Tb. Say in 3 years it is 210Tb.
- Logs: assume 1Mb per task. 200tps * 1Mb * 86400 = 20Tb / day and 70Pb / year. We should not keep logs forever, and for the logs we keep, we should be storing them in blob store.
- Compute: assume on average each task needs 2 vpus and 500Mb of memory. We need 1M * 2 = 2M vcpus and 1M * 500Mb = 500Tb of memory. CPU seems to be the bottleneck here. Let's use vcpu:logicial cpu = 4:1. That means 500k logical cpus, which could mean like ~10k servers.
API design
// Create task (with optional schedule)
POST /tasks -> Task
{
id, //output
name,
description,
schedule, // optional, crontab format
definition, // oneof url to python code in blob storage, or container image
parameters, // key-> value
createTime,
owner, // FK -> User table
status, // output
}
// List, get, pause, resume, delete
GET /tasks -> Task[]
GET /tasks/
PATCH /tasks/
DELETE /tasks/
// Get execution status
GET /tasks/
GET /tasks/
{
id, //output
taskId, // FK -> Task table
scheduledTime,
startTime,
lastUpdateTime,
endTime,
status,
errorCode,
errorDetails,
logUrl,
}
Database design
Task table:
- id (PK)
- name
- description
- schedule, // optional, crontab format
- definition, // oneof url to python code in blob storage, or container image
- parameters, // key-> value
- createTime,
- owner, // FK -> User table
- status, // created | active | completed | deleted
Execution table:
- id
- taskId, // FK -> Task table
- scheduledTime,
- startTime,
- lastUpdateTime,
- endTime,
- status, // created | running | success | failed | etc.
- errorCode,
- errorDetails,
- logUrl,
Based on database size estimate, we choose a relational database, sharded by taskId. Say we create 4 shards, so each DB is ~50Tb in 3 years.
A KV store like DynamoDB would also work well here.
High-level design
API Gateway: responsible for load balancing, SSL termination, rate limiting, etc.
Task Service: responsible for implementing management endpoint for tasks and executions
Database: stores the metadata for tasks and executions
Task Queue: a queue that reliably and durable stores task executions.
Execution Service: responsible for pulling work off task queue and execute the workload
Container Runtime: runtime (e.g. k8s) for containers
Log: blob storage for logs
Request flows
Tasks with immediate execution:
- Client sends a request to TaskService to create a Task
- Task Service persists task definition in database, then it enqueues a message with taskId into the task queue
- Execution Service picks up the message. It creates a container and runs the task. It periodically checks container execution status, and updates the execution status in Execution table.
- The container uses provided blob url to write logs (e.g. GcsFuse like technology)
- When complete, Execution Service detects it and marks the execution complete.
Scheduled tasks:
- We introduce a scheduling component that can calculate the next run time. Then a little change is introduced to implement scheduling.
- We leveraged delayed message capability in queueing systems (e.g. SQS or Spanner queue). When inserting task messages, we delay it for an amount that equals the nextRunTime - currentTime.
- The execution service will pick up the message at the nextRunTime, assuming we provisioned enough capacity in the queue and execution service.
Monitoring:
- Client sends a request to task service to retrieve all executions for a task.
- Client requests metadata for one execution to see details.
- Client directly reads the logs in blob storage to show it to user.
Detailed component design
Task service and task queue
- When a task is created, we want to make sure it is also enqueued in the task queue.
- One way to do this is to leverage Spanner Queue, where table update and queue update can be placed in the same transaction. Alternative we can use CDC (change data capture), where a another queue is introduced for CDC and another listener service that enqueues the task.
Workloads:
- We should run workloads in containers for its isolation, resource management, and reusability.
- One option is to use kubernetes - we schedule a pod for each execution.
- Workload definition should support Python code for good user experience. We store user code (e.g. Python) in blob storage. The we provision a container image that support Python (user doesn't have to be aware of this for simple use cases), which loads user code inside the container and runs it. Alternatively, user can embed the Python code in task definition, and execution service injects the code in container entry point.
- We should support custom containers because the configurable of the task can be very complicated.
- To encourage reuse, we can support task templates, so that user can write the templates once, and parameterize it when running tasks.
Scheduling
- We leverage delayed messaging for scheduling. We calculate the next run time of a task, and delay the queued message for an amount of time equal to nextRunTime - now.
- This simplifies the design (vs polling based design).
- If a task takes too long, we should provide options to user to either stop scheduling new executions, or only allow N exeuctions to be run in parallel for a give task.
Monitoring execution:
- To implement the polling loop that monitors containers, we leverage delayed messaging as well. Each time we check the container runtime, we update the task state, and enqueues a delayed message for checking the container state again in 1 minute.
Trade offs/Tech choices
Scheduling
- Alternatively we can store next run time in the database, and have a polling loop to pick up tasks that are ready to execute. This is rather wasteful use of database, and the polling interval can't be too frequent or two infrequent. If we want tasks to start up faster (e.g. 5 sec instead of 1 min), then polling is bad choice.
At least once execution:
- Exactly once delivery is very hard to implement.
- At least once delivery is a good compromise. It requires clients at different level to be idempotent. What this means is that
- API services (e.g. Task service, K8s API) are idempotent when their client supplies the resource id.
- The workload is idempotent (i.e. executing multiple times results in the same output)
Failure scenarios/bottlenecks
Task service and execution service are stateless and can be horizontally scaled for high availability.
The database can be sharded by task id. As mentioned above, having 4 shards can support max metadata storage needs in 3 years. We also estimate read/write qps: assuming most of the read/write comes from task monitoring, then we have 100k / 1min = 2k qps. This is within the capabilities of a database especically when it is already sharded.
Task queue: estimation is 100k / 1min = 2k messages per second. if we use a managed queue like SQS or Spanner queue, we don't need to worry about sharding. Even for Kafka this is well within the limits of one shard.
Data could grow and exceed our system limits. We implement retention policy for all storage layers (database, queue, blob) for 1 year.
Future improvements
What are some future improvements you would make? How would you mitigate the failure scenario(s) you described above?