My Solution for Design a Wide Column Database

by nectar4678

System requirements

Functional:

Data Storage and Retrieval:

  • The system must be able to store and retrieve large-scale structured data indexed by a row key, where each row can have an unbounded number of columns.
  • Support for variable schema data within the same table.

Scalability:

  • The system must handle millions of rows initially and scale up to billions of rows.
  • It should be possible to add more nodes to the system to handle increased load without downtime.

High Availability:

  • Ensure data availability even in the case of node failures.
  • The system should support replication of data across multiple nodes.

Efficient Writes and Reads:

  • Optimize for high throughput and low-latency write operations.
  • Ensure efficient read operations, supporting both single-row lookups and range scans.

Consistency:

  • Provide tunable consistency levels, allowing applications to choose between strong consistency and eventual consistency based on their requirements.


Non-Functional:

Performance:

  • The system must be optimized for fast write operations.
  • Support read operations with low latency.

Reliability:

  • Ensure data durability even in the event of hardware failures.
  • Implement mechanisms for fault tolerance and recovery.

Maintainability:

  • The system should be easy to maintain and operate.
  • Provide tools for monitoring, managing, and tuning the database.

Security:

  • Implement access control mechanisms to secure data.
  • Support encryption for data at rest and in transit.

Cost Efficiency:

  • Minimize operational costs by utilizing commodity hardware.
  • Optimize storage and compute resource utilization.


Capacity estimation


Assumptions

Initial Data Volume:

  • The system will start with storing data for 10 million rows.
  • Each row will have an average of 50 columns.
  • The average size of each column value is 1 KB.

Growth Rate:

  • The data volume is expected to grow by 10x every year.

Read/Write Operations:

  • The system will handle 100,000 read operations per second.
  • The system will handle 50,000 write operations per second.

Replication Factor:

  • The data will be replicated across 3 nodes for high availability.


Calculations

Storage Requirements:

  • Initial storage per row: 50 columns * 1 KB = 50 KB.
  • Total initial storage: 10 million rows * 50 KB = 500 GB.
  • With replication (factor of 3): 500 GB * 3 = 1.5 TB.

Annual Growth:

  • Year 1: 10 million rows.
  • Year 2: 100 million rows.
  • Year 3: 1 billion rows.
  • Storage after 3 years with replication: 1 billion rows * 50 KB * 3 = 150 TB.

Throughput Requirements:

  • Read operations per second: 100,000.
  • Write operations per second: 50,000.
  • Total operations per second: 150,000.

Network Bandwidth:

  • Assuming each read and write operation involves transferring 1 KB of data:
  • Network bandwidth for reads: 100,000 ops/sec * 1 KB = 100 MB/sec.
  • Network bandwidth for writes: 50,000 ops/sec * 1 KB = 50 MB/sec.
  • Total network bandwidth: 150 MB/sec.


Scalability Considerations

Horizontal Scalability:

  • Add more nodes to handle increased load.
  • Use consistent hashing to distribute data evenly across nodes.

Vertical Scalability:

  • Optimize hardware for increased CPU, memory, and storage capacities.

Partitioning and Sharding:

  • Use partition keys to distribute data across nodes.
  • Implement automatic sharding to handle growing data volumes.


Node Configuration

Initial Cluster Size:

  • Start with a cluster of 5 nodes.
  • Each node should have at least 2 TB of storage, 32 GB of RAM, and 8 CPU cores.

Future Scaling:

  • Plan for adding nodes in multiples of 5 as data volume grows.
  • Regularly monitor and adjust the replication factor to ensure data availability and durability.


API design


Put Data

def put(key: str, columns: dict) -> bool:     """     Stores a row in the database.     Parameters:     - key (str): The unique identifier for the row.     - columns (dict): A dictionary where the key is the column name (str) and the value is the column value (str).     Returns:     - bool: True if the operation is successful, False otherwise.     """


Get Data

def get(key: str) -> dict:     """     Retrieves a row from the database.     Parameters:     - key (str): The unique identifier for the row.     Returns:     - dict: A dictionary where the key is the column name (str) and the value is the column value (str). Returns an empty dictionary if the key does not exist.     """


Delete Data

def delete(key: str) -> bool:     """     Deletes a row from the database.     Parameters:     - key (str): The unique identifier for the row to be deleted.     Returns:     - bool: True if the operation is successful, False otherwise.     """


Update Data

def update(key: str, columns: dict) -> bool:     """     Updates a row in the database.     Parameters:     - key (str): The unique identifier for the row.     - columns (dict): A dictionary where the key is the column name (str) and the value is the column value (str). Only specified columns will be updated.     Returns:     - bool: True if the operation is successful, False otherwise.     """


Scan Data

def scan(start_key: str, end_key: str) -> list:     """     Scans and retrieves rows within a specified range.     Parameters:     - start_key (str): The starting key for the scan.     - end_key (str): The ending key for the scan.     Returns:     - list: A list of dictionaries, where each dictionary represents a row with column names as keys and column values as values.     """


Batch write data

def batch_write(batch: list) -> bool:     """     Performs a batch write operation.     Parameters:     - batch (list): A list of dictionaries, where each dictionary represents a row to be written. Each dictionary should have a 'key' entry and a 'columns' entry.     Returns:     - bool: True if the operation is successful, False otherwise.     """


Batch read Data

def batch_read(keys: list) -> dict:     """     Performs a batch read operation.     Parameters:     - keys (list): A list of unique identifiers (str) for the rows to be read.     Returns:     - dict: A dictionary where the key is the row key (str) and the value is a dictionary of columns (dict) for that row.     """


Get Status

def get_status() -> dict:     """     Retrieves the status of the database.     Returns:     - dict: A dictionary containing the status information of the database, such as number of nodes, storage used, and replication status.     """


Database design Framework

Let's focus on defining how we can create and manage databases using the provided wide column store system. This will involve specifying the logical structure and operations for creating and managing databases within the system rather than designing the database itself.


Key Concepts

Keyspace Management

  • Defines the logical grouping of tables with replication settings.
  • Operations to create, update, and delete keyspaces.

Table Management

  • Defines the schema of tables within a keyspace.
  • Operations to create, update, and delete tables.

Column Family

  • Each table can be considered a column family containing rows and columns.
  • Columns are added dynamically, supporting a flexible schema.


Keyspace Operations

Create Keyspace

def create_keyspace(name: str, replication: dict) -> bool:     """     Creates a new keyspace.     Parameters:     - name (str): The name of the keyspace.     - replication (dict): A dictionary specifying the replication strategy (e.g., {'class': 'SimpleStrategy', 'replication_factor' : 3}).     Returns:     - bool: True if the operation is successful, False otherwise.     """


Update Keyspace

def update_keyspace(name: str, replication: dict) -> bool:     """     Updates an existing keyspace.     Parameters:     - name (str): The name of the keyspace.     - replication (dict): A dictionary specifying the new replication strategy.     Returns:     - bool: True if the operation is successful, False otherwise.     """


Delete Keyspace

def delete_keyspace(name: str) -> bool:     """     Deletes an existing keyspace.     Parameters:     - name (str): The name of the keyspace to delete.     Returns:     - bool: True if the operation is successful, False otherwise.     """


Table Operations

Create Table

def create_table(keyspace: str, table_name: str, schema: dict) -> bool:     """     Creates a new table within a keyspace.     Parameters:     - keyspace (str): The name of the keyspace.     - table_name (str): The name of the table.     - schema (dict): A dictionary specifying the table schema (e.g., {'row_key': 'text', 'column_name': 'text', 'column_value': 'text'}).     Returns:     - bool: True if the operation is successful, False otherwise.     """


Update Table

def update_table(keyspace: str, table_name: str, schema: dict) -> bool:     """     Updates the schema of an existing table.     Parameters:     - keyspace (str): The name of the keyspace.     - table_name (str): The name of the table.     - schema (dict): A dictionary specifying the new table schema.     Returns:     - bool: True if the operation is successful, False otherwise.     """


Delete Table

def delete_table(keyspace: str, table_name: str) -> bool:     """     Deletes an existing table.     Parameters:     - keyspace (str): The name of the keyspace.     - table_name (str): The name of the table to delete.     Returns:     - bool: True if the operation is successful, False otherwise.     """


Column Family Operations

Add Column

def add_column(keyspace: str, table_name: str, row_key: str, column_name: str, column_value: str) -> bool:     """     Adds a column to a row in the table.     Parameters:     - keyspace (str): The name of the keyspace.     - table_name (str): The name of the table.     - row_key (str): The key of the row.     - column_name (str): The name of the column.     - column_value (str): The value of the column.     Returns:     - bool: True if the operation is successful, False otherwise.     """


Update Column

def update_column(keyspace: str, table_name: str, row_key: str, column_name: str, column_value: str) -> bool:     """     Updates a column in a row in the table.     Parameters:     - keyspace (str): The name of the keyspace.     - table_name (str): The name of the table.     - row_key (str): The key of the row.     - column_name (str): The name of the column.     - column_value (str): The new value of the column.     Returns:     - bool: True if the operation is successful, False otherwise.     """


Delete Column

def delete_column(keyspace: str, table_name: str, row_key: str, column_name: str) -> bool:     """     Deletes a column from a row in the table.     Parameters:     - keyspace (str): The name of the keyspace.     - table_name (str): The name of the table.     - row_key (str): The key of the row.     - column_name (str): The name of the column to delete.     Returns:     - bool: True if the operation is successful, False otherwise.     """


High-level design

Primary Key

The primary key in our Database uniquely identifies each row in a table and consists of two parts:

Primary Key = Partition Key + Clustering Key

Partition Key: This part of the primary key determines which node stores the data. For example, in a table with the primary key (city_id, employee_id), city_id acts as the partition key. This means all rows with the same city_id will be stored on the same node.


Clustering Key: This part of the primary key defines the order of the data within the node. In the same example, employee_id serves as the clustering key. This ensures that within each node, the rows are stored in a sorted order based on the employee_id column.


Clustering Key

Clustering keys determine the storage order of data within a node. A table can have multiple clustering keys, and all columns listed after the partition key are referred to as clustering columns. These clustering columns dictate the sequence in which the data is organized on a node.


Data partitioning

We will use consistent hashing for data partitioning


Partition Manager

The partitioner is the component that determines how data is distributed across the Consistent Hash ring. When our DB inserts data into the cluster, the partitioner first applies a hashing algorithm to the partition key. The result of this hashing algorithm dictates the range within which the data falls, and consequently, which node will store the data.


Coordinator Node

A client can connect to any node in the cluster to start a read or write query. This node is referred to as the coordinator node. The coordinator determines which nodes are responsible for the data being written or read and then forwards the queries to those nodes.



Detailed component design


Replication

Each node in our DB acts as a replica for a specific range of data. Our DB stores multiple copies of the data and distributes them across various replicas. This ensures that if one node goes down, other replicas can handle queries for that data range. The replication process depends on two main factors:

  • Replication factor: The replication factor specifies how many nodes will hold a copy of the same data. For example, if a cluster has a replication factor of 3, each row will be stored on three different nodes. Each keyspace in our DB can have its own replication factor.
  • Replication strategy: The node responsible for the range that includes the hash of the partition key will be the first replica. All additional replicas are placed on the subsequent nodes in a clockwise direction. Our DB will ensures that these subsequent replicas are positioned on the next nodes in a circular manner.


Simple replication strategy

This strategy is used only for a single data center cluster. Under this strategy, Our DB places the first replica on a node determined by the partitioner and the subsequent replicas on the next node in a clockwise manner.


Network topology strategy

This strategy is used for multiple data-centers. Under this strategy, we can specify different replication factors for different data-centers. This enables us to specify how many replicas will be placed in each data center. Additional replicas are always placed on the next nodes in a clockwise manner.


Consistency Levels

Consistency level is defined as the minimum number of nodes that must fulfill a read or write operation before the operation is considered successful. It allows specifying different consistency levels for read and write operations. Additionally, consistency is tunable, meaning the levels can be increased or decreased for each request.


There is always a tradeoff between consistency and performance. A higher consistency level requires more nodes to respond to a read or write query, providing greater assurance that the values present on each replica are the same.


Write consistency levels

For write operations, the consistency level specifies how many replica nodes must respond for the write to be reported as successful to the client. Here are some of the different consistency levels which we can offer:


  • One, Two, or Three: The data must be written to at least the specified number of replica nodes for the write to be considered successful.
  • Quorum: The data must be written to at least a majority (quorum) of replica nodes. Quorum is calculated as floor(RF/2 + 1), where RF is the replication factor. For instance, in a cluster with a replication factor of five, a write is considered successful if three nodes acknowledge the write.
  • All: The data must be written to all replica nodes. This level offers the highest consistency but the lowest availability, as writes will fail if any replica is down.
  • Local_Quorum: The data must be written to a quorum of nodes within the same datacenter as the coordinator. It does not require responses from other datacenters.
  • Each_Quorum: The data must be written to a quorum of nodes in each datacenter.
  • Any: The data must be written to at least one node. In cases where all replica nodes for the given partition key are down, the write can still succeed through a hinted handoff. This level provides the lowest latency and highest availability but the lowest consistency.


Read consistency levels

The consistency level for read queries specifies how many replica nodes must respond to a read request before returning the data. For example, for a read request with a consistency level of quorum and replication factor of three, the coordinator waits for successful replies from at least two nodes.


Snitch: The Snitch is an application that determines the proximity of nodes within the ring and identifies which nodes are faster. Nodes use this information to efficiently route read and write requests. We will discuss this in more detail later.


The coordinator always sends the read request to the fastest node. For example, with a Quorum=2, the coordinator sends the request to the fastest node and retrieves a digest of the data from the second-fastest node. The digest is a checksum of the data, used to save network bandwidth.


If the digest does not match, it indicates that some replicas do not have the latest version of the data. In this situation, the coordinator reads the data from all replicas to determine the latest version. The coordinator then returns the most recent data to the client and initiates a read repair request. The read repair operation updates the nodes with the older version of the data to the latest version.


Node failure detection

Accurately detecting failures is a challenging task because it is difficult to determine with absolute certainty whether a system is truly down or simply responding slowly due to heavy load, network congestion, or other factors. Mechanisms like heartbeating provide a binary output indicating whether the system is alive or not, without any intermediate states. Heartbeating relies on a fixed timeout, and if no heartbeat is received from a server within this period, the system assumes that the server has crashed. The value of the timeout is crucial. If the timeout is set too short, the system can quickly detect failures but may generate many false positives due to slow machines or network issues. Conversely, if the timeout is set too long, false positives are reduced, but the system becomes less efficient at detecting failures promptly.


Gossip Protocol

The gossip protocol enables each node to maintain state information about other nodes in the cluster. Nodes exchange this state information with each other to remain synchronized. This peer-to-peer communication mechanism involves nodes periodically sharing their state information and that of other known nodes. Every second, each node initiates a gossip round, exchanging state information with one to three random nodes. This ensures that all nodes in the cluster quickly become aware of the current state of every other node. Each gossip message includes a version number, allowing older information to be updated with the most recent state for each node during the exchange.


Maintaining Commit logs

When a node receives a write request, it first writes the data to a commit log, which is a write-ahead log stored on disk. This commit log serves as a crash-recovery mechanism to ensure Cassandra's durability. A write operation is not considered successful on the node until it is recorded in the commit log. This guarantees that even if the data does not reach the in-memory store (the MemTable, discussed later), it can still be recovered. In the event of a node shutdown or unexpected crash, the commit log ensures data is not lost, as the log can be replayed when the node restarts.


Sequential Write

Sequential writes are the primary reason that writes perform so efficiently. No reads or seeks are necessary for writing a value because all writes are ‘append’ operations. If values were naively inserted where they ultimately belonged, writing clients would incur the cost of seeks upfront.


Tombstones 

An interesting case can occur when we delete some data for a node that is down or unreachable; that node could miss the delete operation. When the node comes back online and a repair occurs, it could "resurrect" the previously deleted data by re-sharing it with other nodes. To prevent deleted data from being reintroduced, a concept called a tombstone is used. A tombstone is similar to the idea of a "soft delete" in the relational database world. When data is deleted, it is not removed immediately; instead, a tombstone is associated with it, with a set expiry time. In other words, a tombstone is a marker indicating data that has been deleted. When a delete operation is executed, the data is not immediately removed but is treated as an update operation that places a tombstone on the value.


Each tombstone has an expiry time associated with it, representing the amount of time that nodes will wait before removing the data permanently. 



Failure scenarios/bottlenecks

  1. As a tombstone itself is a record, it takes storage space. Hence, it should be kept in mind that upon deletion, the application will end up increasing the data size instead of shrinking it. Furthermore, if there are a lot of tombstones, the available storage for the application could be substantially reduced.
  2. When a table accumulates many tombstones, read queries on that table could become slow and can cause serious performance problems like timeouts. This is because we have to read much more data until the actual compaction happens and removes the tombstones. 
  3. While the commit log ensures durability, replaying the commit log during recovery can be time-consuming and impact system availability temporarily.
  4. Regular repair operations to maintain data consistency across nodes can be resource-intensive and impact the overall performance of the system.
  5. Data replication across multiple nodes for high availability and durability can consume significant network bandwidth, potentially leading to network congestion and increased latencies for read/write operations.
  6. The constant state information exchange between nodes using the gossip protocol can add to network overhead, particularly in large clusters.
  7. While sequential writes improve performance, the system’s reliance on disk speed can become a bottleneck if disk I/O performance is not sufficient. High write throughput can saturate disk bandwidth, leading to increased latency.