My Solution for Design a Top-K Request Analysis System

by nectar4678

System requirements

  • Track Top 'K' Requests: The system should identify and return the top 'K' most frequent web requests within configurable time intervals (e.g., 1 minute, 10 minutes, 1 hour).
  • Configurable K and Time Intervals: Allow dynamic adjustment of both the 'K' value (e.g., up to 100) and the time interval for which the requests are analyzed.
  • Real-Time Querying: Support near real-time querying of the top 'K' requests during the specified time intervals.
  • Scalable Data Ingestion: The system must handle large-scale web traffic, specifically 10 million requests per minute (equivalent to 1.2 GB of data per minute).
  • Support Historical Analysis: Provide the ability to query data for past time intervals (e.g., last 1 minute, last 10 minutes, last hour).
  • High Availability: Ensure the system remains operational even under heavy load or minor hardware failures.
  • API for Querying: Provide REST APIs that enable clients to query for top 'K' requests for a specific time interval.
  • Efficient Data Aggregation: Efficiently aggregate requests in near real-time to avoid bottlenecks in processing.
  • Flexible Metadata: Support storage and analysis of request metadata (e.g., URL, timestamp, user agent) for deeper insights.



Non-Functional:

  • Scalability: The system must scale horizontally to handle increased traffic, ensuring smooth operation even if traffic volume spikes.
  • Low Latency: Queries to retrieve the top 'K' requests should return within a few seconds.
  • Data Retention: Data for each time interval should be retained for a configurable period, potentially up to several days or weeks, depending on storage capacity.
  • Fault Tolerance: The system should have redundancy built-in to ensure that data is not lost in the event of a node failure, with automatic failover mechanisms.
  • Data Integrity: Ensure that no data is lost during ingestion or processing, even under heavy load.
  • High Throughput: The system should process 10 million requests per minute, storing and indexing them efficiently for retrieval.
  • Cost Efficiency: Minimize infrastructure costs by choosing appropriate technologies and data storage solutions that balance performance with cost.




Capacity estimation

Assumptions

  • Traffic Volume: The system will receive 10 million requests per minute.
  • Request Size: Each request carries around 100 bytes of data, including metadata like URL, timestamp, and other fields.
  • Retention Period: We assume the system retains data for 24 hours to allow for historical analysis, but this could be configurable.
  • Throughput Goal: The system needs to handle 1.2 GB of incoming data every minute (10 million requests x 100 bytes per request).
  • Scalability: Horizontal scaling (adding more nodes) is expected to handle increasing traffic, and each node will have its own storage and processing power.

Storage Estimation

For a single minute of data:

  • Data per minute: 10 million requests * 100 bytes = 1.2 GB of data per minute.
  • Data per hour: 1.2 GB * 60 minutes = 72 GB of data per hour.
  • Data per day (24 hours): 72 GB * 24 hours = 1.728 TB per day.

For retaining 24 hours of data for analysis, we need a total of 1.728 TB of storage.

If we extend this to 7 days of data retention, the required storage will be:

  • 7-day retention: 1.728 TB/day * 7 = 12.096 TB.






API design

Get Top 'K' Requests API

Purpose: Retrieve the top 'K' most frequent requests in a specified time interval. Endpoint: /api/top-requests Method: GET Parameters: k (required, integer): The number of top requests to retrieve (maximum value = 100). interval (required, string): The time interval for which to get the top 'K' requests. Supported values: 1min, 10min, 1hour. timestamp (optional, ISO 8601 format): The specific timestamp to query (if not provided, the current time will be used). Response: { "interval": "1min", "timestamp": "2023-10-10T12:00:00Z", "k": 10, "top_requests": [ { "url": "/home", "count": 150000 }, { "url": "/api/login", "count": 120000 }, { "url": "/products", "count": 110000 }, { "url": "/cart", "count": 90000 }, { "url": "/checkout", "count": 80000 }, { "url": "/search", "count": 70000 }, { "url": "/api/signup", "count": 60000 }, { "url": "/products/123", "count": 50000 }, { "url": "/category/electronics", "count": 40000 }, { "url": "/api/logout", "count": 30000 } ] }


Set K and Time Interval Configuration API

Purpose: Adjust the default 'K' value and time intervals globally or for specific users/clients. Endpoint: /api/configure Method: POST Request Body: k (optional, integer): Set a global maximum value for 'K' (e.g., 100). intervals (optional, list of strings): List of valid intervals that clients can query (e.g., ["1min", "10min", "1hour"]). Response: { "status": "success", "message": "Configuration updated", "k": 100, "intervals": ["1min", "10min", "1hour"] }


Get Historical Data API

Purpose: Retrieve top 'K' requests for a past time window. This can be useful for audits or longer-term analysis. Endpoint: /api/historical-data Method: GET Parameters: start_time (required, ISO 8601 format): The start time for the time window. end_time (required, ISO 8601 format): The end time for the time window. k (required, integer): The number of top requests to retrieve. interval (optional, string): The time interval to query, default is 1hour. Response: { "interval": "1hour", "start_time": "2023-10-10T10:00:00Z", "end_time": "2023-10-10T11:00:00Z", "k": 5, "top_requests": [ { "url": "/home", "count": 500000 }, { "url": "/products", "count": 480000 }, { "url": "/api/login", "count": 470000 }, { "url": "/cart", "count": 460000 }, { "url": "/checkout", "count": 450000 } ] }



Database design

Key Considerations:

  1. High-Volume Ingestion: The system must handle 10 million requests per minute. This will require partitioning of data and a distributed, horizontally scalable database solution.
  2. Efficient Aggregation: The system needs to calculate the top 'K' requests for each time window, which requires fast aggregation of data based on request counts.
  3. Time-Based Querying: Queries will be based on time intervals, which makes time-based indexing important for performance.


Request Data Table (Primary Storage for Raw Request Data)

Purpose: Stores all incoming web requests along with metadata such as URL, timestamp, and other details.


Storage Considerations:

  • Partitioning by timestamp can ensure scalability over time, with each partition containing requests for a specific time window (e.g., by hour or by minute).
  • This table will be periodically aggregated and purged after data retention policies are applied.


Top-K Aggregation Table (For Real-Time Aggregation)

Purpose: Stores pre-aggregated counts of requests for different URLs over defined time windows. This allows for fast querying when retrieving the top 'K' requests.


Storage Considerations:

  • Partitioning by interval_start ensures that each time window can be queried independently.
  • Data in this table can be updated incrementally as new requests are aggregated in real-time.


Configuration Table (For Storing System Configurations)

Purpose: Stores global system configurations such as the default 'K' value and available time intervals.


Storage Considerations:

  • This table is updated infrequently and requires only minimal storage and access.



High-level design

Key Components

  1. Load Balancer: Distributes incoming web traffic evenly across multiple ingestion nodes to prevent overloading any single node.
  2. Ingestion Layer: Collects and processes raw requests in real-time. Each ingestion node receives a portion of the traffic, aggregates requests locally, and forwards data to the processing layer.
  3. Stream Processing Layer: Responsible for real-time aggregation of request counts. This layer continuously processes incoming requests, updates the top 'K' rankings, and stores the results in the database.
  4. Database Layer: Consists of two parts:
    • Raw Request Storage: Stores all incoming requests for a configurable retention period (e.g., 24 hours).
    • Top-K Request Storage: Stores aggregated top 'K' requests for different time intervals, allowing for fast querying.
  5. API Layer: Provides an interface for users to query the top 'K' requests, configure system settings, and retrieve historical data.
  6. Cache Layer (Optional): A caching layer (e.g., Redis) can be used to store frequently accessed query results and improve performance for high-demand queries.



Scalability

  1. Horizontal Scaling: Ingestion Nodes and Stream Processing Nodes can be scaled horizontally to handle higher traffic volumes. Each new node takes a share of the load, ensuring that performance remains stable even as traffic grows.
  2. Partitioning by Time: Both raw request data and top-K results are partitioned by time, ensuring that each time window can be queried or processed independently.
  3. Distributed Stream Processing: Using a distributed stream processing framework allows the system to scale as more data is ingested, enabling real-time aggregation without performance degradation.





Request flows

Request Flow 1: Ingesting a Web Request

  1. Client Web Request: A user makes a web request (e.g., accessing /home or /api/login) from their browser or mobile app.
  2. Load Balancer: The request hits the Load Balancer, which distributes it to one of the Ingestion Nodes. The Load Balancer ensures even traffic distribution.
  3. Ingestion Node: The selected Ingestion Node processes the incoming request, recording details such as the URL, timestamp, and metadata (e.g., user agent).
  4. Stream Processing Layer: The Ingestion Node forwards the request data to the Stream Processing Layer, where the request counts are aggregated in real-time. Each incoming request increments the count for its corresponding URL.
  5. Aggregation & Storage:
    • Raw Request Storage: The raw request is stored for future analysis or auditing purposes.
    • Top-K Aggregation Storage: The real-time aggregated counts of requests are stored, partitioned by time intervals (e.g., per minute, per hour).



Request Flow 2: Querying Top 'K' Requests

  1. Client Query: A client application (e.g., a monitoring dashboard) sends a query to the API Gateway to retrieve the top 'K' requests for a specific time interval.
  2. API Gateway: The API Gateway receives the query and forwards the request to the relevant storage or cache layer.
  3. Cache Check: If the result for the queried time interval and 'K' value is cached (e.g., in Redis), the cached result is returned. Otherwise:
  4. Top-K Aggregation Storage: The system queries the Top-K Aggregation Storage to retrieve the pre-aggregated top 'K' URLs for the specified time window.
  5. Response to Client: The API Gateway returns the top 'K' URLs and their respective request counts to the client.









Detailed component design

Ingesting a Request:

  1. The user sends a request (e.g., accessing /home or /products) that is received by the Load Balancer.
  2. The Load Balancer distributes this request to an Ingestion Node. This distribution ensures no single node is overwhelmed by traffic.
  3. The Ingestion Node processes the request and forwards it to the Stream Processing Layer, which aggregates the request counts.
  4. The Stream Processing Layer updates the count for the requested URL in Top-K Aggregation Storage and stores the raw request data in Raw Request Storage.

Querying Top-K Requests:

  1. When a client queries the top 'K' requests for a specific time interval, the API Gateway checks the Cache Layer to see if the result is already available.
  2. If the result is cached, it is returned immediately, improving performance.
  3. If the result is not cached, the API queries the Top-K Aggregation Storage for the requested time window. This database contains pre-aggregated counts, making it efficient to retrieve results.
  4. The API Gateway returns the results to the client.





Trade offs/Tech choices

Data Partitioning: Time-Based Partitioning

  • Technology Choice: Partitioning data by time for both raw request storage and aggregated top 'K' results.
  • Trade-offs:
    • Increased complexity in managing partitions: Time-based partitions must be managed effectively to ensure that old data is archived or purged correctly, and partition sizes are balanced. There’s also a need to manage the performance of time-range queries.
    • Cold starts on older data: If a query requests older data that hasn’t been cached, retrieving data from partitions can introduce higher latencies.


Data Retention and Cost Efficiency

  • Technology Choice: Raw request data is stored for a configurable retention period, with a Time-to-Live (TTL) mechanism for automatic data deletion.
  • Trade-offs:
    • Loss of detailed request data: After the retention period expires, detailed request data will be lost. However, this is mitigated by the fact that the aggregated top 'K' results will remain for long-term analysis.





Failure scenarios/bottlenecks

  • By deploying multiple ingestion nodes behind a load balancer, traffic can be automatically redirected to healthy nodes in case of a failure. This reduces the likelihood of data loss.
  • Stream processing frameworks like Apache Flink support periodic checkpointing. In the event of a failure, the system can restart from the last checkpoint, minimizing data loss.
  • Partition the top 'K' storage by time intervals (e.g., minute, hour). This spreads the data across multiple shards, improving query and write performance.
  • Distribute incoming requests across nodes by hashing the URL. This ensures that hot URLs are distributed across multiple ingestion and processing nodes, reducing the chance of hot spots.





Future improvements

  • The current system allows querying of the top 'K' requests based on time intervals, but it does not provide advanced filtering or detailed analytics (e.g., filtering by user agent, geolocation, or request metadata). Introduce more flexible query capabilities, allowing clients to filter top 'K' requests by various attributes such as IP address, user agent, geolocation, or specific user groups.
  • Implement a multi-region deployment strategy to handle global traffic efficiently. This would involve deploying ingestion nodes, stream processing, and storage in multiple regions, closer to the users.