System requirements


Functional:

  1. Schedule simple tasks
  2. Schedule recurring task
  3. Schedule dependency based tasks which rely on another tasks to be completed first before them which forms like DAG. once the jobs are entered we can take a separate api endpoint to link dependencies of the jobs like x depends on y and we will generate that graph and schedule them all based on the new time mentioned here. and cyclic graph will be rejected with 400.
  4. Tracking status of jobs.
  5. listing all historical jobs and scheduled jobs
  6. retries are configurable from input json for the APIs, files will be provided based on s3 links list in the json and time to schedule will also be mentioned.
  7. Tasks can be scheduled based on First come first serve if the priority is same for all jobs and if not then based on priority based on 0 to 100 range will be provided by the user. This priority can determine overall system or can be just at user level based on the configuration json provided from the user.
  8. If job fails then we need to retry based on the max retries provided from the user side but if the worker goes down then we should not consider it as job failure and not increment retry count.



Non-Functional:

  1. QPS: 864 million jobs per day, peak load per day will be 10k per second.
  2. 1KB max per task with api.
  3. 5 mins per task in worst case.
  4. <= 1 second latency
  5. Availability 3 nines and Eventually consistent system.
  6. Fault Tolerant & Reliable system.
  7. Secure system with authentication, authorization & full data security.
  8. Monitoring servers load, database load, api endpoints hit, Logging errors and Alerting: circutbreaking automated based on alearting.
  9. Horizontally scalable system with auto scaling configured on kubernetes.




Capacity estimation

10^4 jobs per second

storage requirement = 10^4 * 1kb = 10 MB per second = 0.864TB per day = 315.36 TB per year = 1.576 PB for 5 years.

so our system is high throughput and decent enough storage requirement.


In case of unexpected load we should increase our QPS to be 2 times and double our storage requirement.




API design


simple tasks api:

POST /v1/tasks/simple/

{

"command" : "python x.py --p1"

"s3_script_path": {"x.py":"www.s3.com/bucket-name/user_name/x.py"}

"max_retries": 5

"schedule-time": "YYYY-MM-DD Hour: Min: TimeZone"

"schedule":"ASAP"

"priority": 20

"priority-level": "global"

"idempotency_key":"sfafa324324"

}

returns: 202 success

processes the request asynchronously.

either schedule-time or schedule

priority-level can be local or global

400 if command, script, schedule time or schedule is not mentioned

if priority is not mentioned that it will be considered as 1 by default

and priority level will be local by default.

401 for unauthentic

403 for unauthorzied

500 for internal server error




recurring tasks api:

POST /v1/tasks/recurring/

input=

{

"command" : "python x.py --p1"

"s3_script_path": {"x.py":"www.s3.com/bucket-name/user_name/x.py"}

"max_retries": 5

"schedule-time": "YYYY-MM-DD Hour: Min: TimeZone"

"interval": "YYYY-MM-DD Hour: Min"

"schedule":"ASAP"

"priority": 20

"priority-level": "global",

"idempotency_key":"sfafa324324"

}

returns: 202 success

{

taskid:"23421"

}

processes the request asynchronously.

either schedule-time or schedule

priority-level can be local or global

400 if command, script, schedule time or schedule is not mentioned

if priority is not mentioned that it will be considered as 1 by default

and priority level will be local by default.will be added to base schedule time each time and scheduled after that much time.

401 for unauthentic

403 for unauthorzied

500 for internal server error



DAG based Jobs:

Create new dag:

POST /v1/tasks/dag/

{

"max_retries": 5

"priority": 20

"priority-level": "global",

"idempotency_key":"sfafa324324"

}

returns: 200 success

priority-level can be local or global

max retry is 1 by default

400 if command or script is not mentioned

if priority is not mentioned that it will be considered as 1 by default

and priority level will be local by default.

401 for unauthentic

403 for unauthorzied

500 for internal server error



This following api just adds job:

POST /v1/tasks/dag/

{

"command" : "python x.py --p1"

"s3_script_path": {"x.py":"www.s3.com/bucket-name/user_name/x.py"}

"max_retries": 5

"priority": 20

"priority-level": "global"

"dag_id": "123",

\"idempotency_key":"sfafa324324"

}

returns: 200 success

priority-level can be local or global

max retry is 1 by default

400 if command or script or dag_id is not mentioned

if priority is not mentioned that it will be considered as 1 by default

and priority level will be local by default.

401 for unauthentic

403 for unauthorized

500 for internal server error



This following api just to start

POST /v1/tasks/dag/

{

"command" : "start"

"dag_id": "1234"

"max_retries": 5

"priority": 20

"priority-level": "global",

"idempotency_key":"sfafa324324"

}

returns: 202 aceepted

max retry is 1 by default

400 if command or script or dag_id is not mentioned

if priority is not mentioned that it will be considered as 1 by default

and priority level will be local by default.

401 for unauthentic

403 for unauthorized

500 for internal server error


This is for making dependency:

POST /v1/tasks/dag/dependency/

{

"dependent_job_id" : "132",

"dependeee_job_id" : "123",

"dag_id" : "1345325"

"idempotency_key":"sfafa324324"

}

returns: 200 success

{

"id""

}

this returns DAG id

max retry is 1 by default

400 if command or script is not mentioned

if priority is not mentioned that it will be considered as 1 by default

and priority level will be local by default.

401 for unauthentic

403 for unauthorized

500 for internal server error


GET /v1/tasks/{id}

returns: 200

{

status:"running"

retries:"2"

}

status can be scheduled, running, completed, failed, retried

retries is number of retries already tried


paginated api for listing jobs:

GET /v1/tasks/?query=page_num=2&pagesize=10

returns: 200

{

{

job_id:"234"

status:"running"

retries:"2"

},

{

job_id:"2324"

status:"completed"

retries:"2"

}....

}



All the APIs above will have JWT in their headers for user authentication and authorization .

For all the above APIs idempotency key is mandatory else it will throw 400 bad request.

we can set limit on what will be number of times per minute we














Database design

we will have multiple tables


  1. simple_jobs Table
  2. recurring_jobs Table
  3. dag_jobs Table for main dag job creation
  4. dag_job_tasks table will be simply like simple_jobs
  5. dag_job_dependency this will be edges table in DAG




High-level design


I don't need table joins but need the database which can do microwrites for state updates of jobs and read the table entry and also can list the jobs. Our access pattern is also key value based mostly.


Based on the requirement I will go for Mongo DB for database. which will be partitioned based on user id. Each of the shard will be replicated achoncronusly to 2 read replica and the original shard will be the one which will be master. so we will go for master slave architecture. this will make our database fault tolerant.


We will do atomic writes for state changes and db entry for post requests so we will make the writes in transactions.



Apache Air Flow will manage our DAG for us and we simplly have to invoke its api with the appropriate request.



Our API servers will serve the api reqests and will maintian priority queue with heap based implementation for priority based tasks without any schedule. Heap will just contain Job Id and priority so it will not take much space for in memory implementation. In fact its better to implement this via redis priority queue so that we have this data in a centralized queue.


We can cache Job Lists in the API inself.



Kafka will be used for jobs which are needed to be scheduled and its workers(consumer) will pick the task and complete it and update the status in DB and acknowledge kafka for message consumption.

Kafka can only have single topic and multiple partitions and we don't need to have partition key so that messages will be written to partition in round robin fashion.


API server will write to DB when any new post request comes first. then write to kafka , apache airflow or redis depending on the type of the task.


















Request flows


  1. Any Post request jobs will directly come to our api gateway which will do the authentication and autherization of the request and then check the ratelimiter whether they have passed the minute tokens or not using the token buket then will load balance using round robin to API servers.
  2. If the post request for any job comes our api server will make entry to database and check whether there is any other entry with same idempotency key or not if not then it will go ahead with the flow else it will block the request.
  3. reqest for simple job will be checked with priority and if there is no priroity mentioned then we will update it to redis and we will pick up from readis the high priority job and insert it in the kafka and workers will pick it execute the task and then update the status in database then update the queue. next time when the job is picked up it will check the database whether it is completed or not if not then only it will worker will start execution.
  4. for DAG based post request our api server will send it forward to Apache Air flow and it with the appropriate config and it will do the dag request handling. It is API server will do the first make db entry.
  5. Idempotnecy key will be genrateed on the client side and will be random but will remain same for same request.




Detailed component design

Dig deeper into 2-3 components and explain in detail how they work. For example, how well does each component scale? Any relevant algorithm or data structure you like to use for a component? Also you could draw a diagram using the diagramming tool to enhance your design...

  1. 1. If workers go down then we will not retry but if job fails on worker then worker will increment the count and then go to try again.
  2. Read request will be send to Read replica of Database and write will be done on the shard so database will be fault tolerant. If at all the master fails then we can set read replica to master and spin up another read replica.
  3. If Airflow workers goes down then it can spin its servers by its orchestrater so we don't have to manage it explicitly.
  4. Kafka will also have message queues replicated asyncronusly and will have 2 replica partitions for each.





Trade offs/Tech choices

Explain any trade offs you have made and why you made certain tech choices...

  1. when multiple dependent tasks are sent our API server will send it to Airflow and it will manage it for us.




Failure scenarios/bottlenecks

Try to discuss as many failure scenarios/bottlenecks as possible.



  1. 1. If workers go down then we will not retry but if job fails on worker then worker will increment the count and then go to try again.
  2. Read request will be send to Read replica of Database and write will be done on the shard so database will be fault tolerant. If at all the master fails then we can set read replica to master and spin up another read replica.
  3. If Airflow workers goes down then it can spin its servers by its orchestrater so we don't have to manage it explicitly.
  4. Kafka will also have message queues replicated asyncronusly and will have 2 replica partitions for each.



Future improvements

What are some future improvements you would make? How would you mitigate the failure scenario(s) you described above?

  1. If we see any abnormal load on our system. then we can add a rabbitmq in front of kafka to increase the fan out and add multiple more workers
  2. we can also add prometheus to monitor our system and grafana dashboard for visualization.
  3. we can Add ELK stack for logging and searching.
  4. we can add jaeger for tracing the request latency.