System requirements
Functional:
- Schedule simple tasks
- Schedule recurring task
- Schedule dependency based tasks which rely on another tasks to be completed first before them which forms like DAG. once the jobs are entered we can take a separate api endpoint to link dependencies of the jobs like x depends on y and we will generate that graph and schedule them all based on the new time mentioned here. and cyclic graph will be rejected with 400.
- Tracking status of jobs.
- listing all historical jobs and scheduled jobs
- retries are configurable from input json for the APIs, files will be provided based on s3 links list in the json and time to schedule will also be mentioned.
- Tasks can be scheduled based on First come first serve if the priority is same for all jobs and if not then based on priority based on 0 to 100 range will be provided by the user. This priority can determine overall system or can be just at user level based on the configuration json provided from the user.
- If job fails then we need to retry based on the max retries provided from the user side but if the worker goes down then we should not consider it as job failure and not increment retry count.
Non-Functional:
- QPS: 864 million jobs per day, peak load per day will be 10k per second.
- 1KB max per task with api.
- 5 mins per task in worst case.
- <= 1 second latency
- Availability 3 nines and Eventually consistent system.
- Fault Tolerant & Reliable system.
- Secure system with authentication, authorization & full data security.
- Monitoring servers load, database load, api endpoints hit, Logging errors and Alerting: circutbreaking automated based on alearting.
- Horizontally scalable system with auto scaling configured on kubernetes.
Capacity estimation
10^4 jobs per second
storage requirement = 10^4 * 1kb = 10 MB per second = 0.864TB per day = 315.36 TB per year = 1.576 PB for 5 years.
so our system is high throughput and decent enough storage requirement.
In case of unexpected load we should increase our QPS to be 2 times and double our storage requirement.
API design
simple tasks api:
POST /v1/tasks/simple/
{
"command" : "python x.py --p1"
"s3_script_path": {"x.py":"www.s3.com/bucket-name/user_name/x.py"}
"max_retries": 5
"schedule-time": "YYYY-MM-DD Hour: Min: TimeZone"
"schedule":"ASAP"
"priority": 20
"priority-level": "global"
"idempotency_key":"sfafa324324"
}
returns: 202 success
processes the request asynchronously.
either schedule-time or schedule
priority-level can be local or global
400 if command, script, schedule time or schedule is not mentioned
if priority is not mentioned that it will be considered as 1 by default
and priority level will be local by default.
401 for unauthentic
403 for unauthorzied
500 for internal server error
recurring tasks api:
POST /v1/tasks/recurring/
input=
{
"command" : "python x.py --p1"
"s3_script_path": {"x.py":"www.s3.com/bucket-name/user_name/x.py"}
"max_retries": 5
"schedule-time": "YYYY-MM-DD Hour: Min: TimeZone"
"interval": "YYYY-MM-DD Hour: Min"
"schedule":"ASAP"
"priority": 20
"priority-level": "global",
"idempotency_key":"sfafa324324"
}
returns: 202 success
{
taskid:"23421"
}
processes the request asynchronously.
either schedule-time or schedule
priority-level can be local or global
400 if command, script, schedule time or schedule is not mentioned
if priority is not mentioned that it will be considered as 1 by default
and priority level will be local by default.will be added to base schedule time each time and scheduled after that much time.
401 for unauthentic
403 for unauthorzied
500 for internal server error
DAG based Jobs:
Create new dag:
POST /v1/tasks/dag/
{
"max_retries": 5
"priority": 20
"priority-level": "global",
"idempotency_key":"sfafa324324"
}
returns: 200 success
priority-level can be local or global
max retry is 1 by default
400 if command or script is not mentioned
if priority is not mentioned that it will be considered as 1 by default
and priority level will be local by default.
401 for unauthentic
403 for unauthorzied
500 for internal server error
This following api just adds job:
POST /v1/tasks/dag/
{
"command" : "python x.py --p1"
"s3_script_path": {"x.py":"www.s3.com/bucket-name/user_name/x.py"}
"max_retries": 5
"priority": 20
"priority-level": "global"
"dag_id": "123",
\"idempotency_key":"sfafa324324"
}
returns: 200 success
priority-level can be local or global
max retry is 1 by default
400 if command or script or dag_id is not mentioned
if priority is not mentioned that it will be considered as 1 by default
and priority level will be local by default.
401 for unauthentic
403 for unauthorized
500 for internal server error
This following api just to start
POST /v1/tasks/dag/
{
"command" : "start"
"dag_id": "1234"
"max_retries": 5
"priority": 20
"priority-level": "global",
"idempotency_key":"sfafa324324"
}
returns: 202 aceepted
max retry is 1 by default
400 if command or script or dag_id is not mentioned
if priority is not mentioned that it will be considered as 1 by default
and priority level will be local by default.
401 for unauthentic
403 for unauthorized
500 for internal server error
This is for making dependency:
POST /v1/tasks/dag/dependency/
{
"dependent_job_id" : "132",
"dependeee_job_id" : "123",
"dag_id" : "1345325"
"idempotency_key":"sfafa324324"
}
returns: 200 success
{
"id""
}
this returns DAG id
max retry is 1 by default
400 if command or script is not mentioned
if priority is not mentioned that it will be considered as 1 by default
and priority level will be local by default.
401 for unauthentic
403 for unauthorized
500 for internal server error
GET /v1/tasks/{id}
returns: 200
{
status:"running"
retries:"2"
}
status can be scheduled, running, completed, failed, retried
retries is number of retries already tried
paginated api for listing jobs:
GET /v1/tasks/?query=page_num=2&pagesize=10
returns: 200
{
{
job_id:"234"
status:"running"
retries:"2"
},
{
job_id:"2324"
status:"completed"
retries:"2"
}....
}
All the APIs above will have JWT in their headers for user authentication and authorization .
For all the above APIs idempotency key is mandatory else it will throw 400 bad request.
we can set limit on what will be number of times per minute we
Database design
we will have multiple tables
- simple_jobs Table
- recurring_jobs Table
- dag_jobs Table for main dag job creation
- dag_job_tasks table will be simply like simple_jobs
- dag_job_dependency this will be edges table in DAG
High-level design
I don't need table joins but need the database which can do microwrites for state updates of jobs and read the table entry and also can list the jobs. Our access pattern is also key value based mostly.
Based on the requirement I will go for Mongo DB for database. which will be partitioned based on user id. Each of the shard will be replicated achoncronusly to 2 read replica and the original shard will be the one which will be master. so we will go for master slave architecture. this will make our database fault tolerant.
We will do atomic writes for state changes and db entry for post requests so we will make the writes in transactions.
Apache Air Flow will manage our DAG for us and we simplly have to invoke its api with the appropriate request.
Our API servers will serve the api reqests and will maintian priority queue with heap based implementation for priority based tasks without any schedule. Heap will just contain Job Id and priority so it will not take much space for in memory implementation. In fact its better to implement this via redis priority queue so that we have this data in a centralized queue.
We can cache Job Lists in the API inself.
Kafka will be used for jobs which are needed to be scheduled and its workers(consumer) will pick the task and complete it and update the status in DB and acknowledge kafka for message consumption.
Kafka can only have single topic and multiple partitions and we don't need to have partition key so that messages will be written to partition in round robin fashion.
API server will write to DB when any new post request comes first. then write to kafka , apache airflow or redis depending on the type of the task.
Request flows
Explain how the request flows from end to end in your high level design. Also you could draw a sequence diagram using the diagramming tool to enhance your explanation...
Detailed component design
Dig deeper into 2-3 components and explain in detail how they work. For example, how well does each component scale? Any relevant algorithm or data structure you like to use for a component? Also you could draw a diagram using the diagramming tool to enhance your design...
Trade offs/Tech choices
Explain any trade offs you have made and why you made certain tech choices...
Failure scenarios/bottlenecks
Try to discuss as many failure scenarios/bottlenecks as possible.
Future improvements
What are some future improvements you would make? How would you mitigate the failure scenario(s) you described above?