My Solution for Design a Database Batch Auditing Service with Score: 9/10
by john_chen
System requirements
The designed system enables users to define SQL, Hive, or Trino queries for periodic batch audits of their database tables. The system's capabilities and requirements include CRUD operations for audit jobs, specified intervals for audits, designated owners, and precise conditions based on SQL query results.
Functional requirements include:
- A failed audit triggers an alert.
- Users can view logs and the status of ongoing or past jobs.
- Each job must complete within 6 hours, and each query within 15 minutes to prevent system delays.
Non-functional requirements cover:
- Scalability, with less than 10,000 jobs anticipated.
- Limited availability, as this is an internal tool without critical dependencies.
- Security, ensuring that only job owners can modify their configurations.
- Accuracy, ensuring that each audit's results reflect its configuration precisely.
Capacity estimation
- Anticipated number of jobs: Less than 10,000
- Let's assume an average of 1 job per hour for each job:
- Total jobs per day: 1 job/hour * 24 hours = 24 jobs/day
- Resource allocation per job execution:
- CPU: 0.5 CPU cores
- Memory: 512 MB RAM
- Storage: 1 GB storage space per job
- Assuming an average storage requirement per job:
- Total storage per day: 1 GB * 24 jobs = 24 GB/day
- Estimated storage per month: 24 GB/day * 30 days = 720 GB/month
API design
- Endpoint for Creating Auditing Jobs:
- Endpoint:
POST /api/auditing-jobs
- Description: Create a new auditing job with the specified parameters.
- Request Payload: JSON object containing job details.
- Response: Confirmation message with job ID.
- Endpoint:
- Endpoint for Reading Auditing Jobs:
- Endpoint:
GET /api/auditing-jobs/{jobId}
- Description: Retrieve details of a specific auditing job by job ID.
- Request Parameter:
jobId
to identify the auditing job. - Response: JSON object containing job details.
- Endpoint:
- Endpoint for Updating Auditing Jobs:
- Endpoint:
PUT /api/auditing-jobs/{jobId}
- Description: Update the parameters or status of an existing auditing job.
- Request Parameter:
jobId
to identify the auditing job. - Request Payload: JSON object containing updated job details.
- Response: Confirmation message on job update.
- Endpoint:
- Endpoint for Deleting Auditing Jobs:
- Endpoint:
DELETE /api/auditing-jobs/{jobId}
- Description: Delete a specific auditing job by job ID.
- Request Parameter:
jobId
to identify the auditing job. - Response: Confirmation message on job deletion.
- Endpoint:
- Endpoint for Checking Job Status:
- Endpoint:
GET /api/auditing-jobs/{jobId}/status
- Description: Check the current status of a specific auditing job.
- Request Parameter:
jobId
to identify the auditing job. - Response: Job status (e.g., pending, completed, failed).
- Endpoint:
- Endpoint for Retrieving Historical Data:
- Endpoint:
GET /api/auditing-jobs/history
- Description: Retrieve historical data of auditing jobs processed.
- Response: List of auditing jobs with details like status, completion time, etc.
- Endpoint:
Let's visualize these additional API endpoints and their interactions in a sequence diagram.
This sequence diagram illustrates how a client interacts with the Batch Auditing Service through the new API endpoints for creating, reading, updating, deleting auditing jobs, checking their statuses, and accessing historical data.
Database design
I opt for using SQL to store metadata used by the auditing and alert service.
JOB_CONFIG
table holds configurations for audit jobs, linked toOWNER
,ALERT
, andSCRIPT
tables as needed.OWNER
table stores information about job owners to establish owner-job relationships.ALERT
table records alerts triggered by job failures, associated with the respectiveJOB_CONFIG
entry.SCRIPT
table stores the generated validation scripts for each job in connection with theJOB_CONFIG
entry.
High-level design
The high level diagram depicts the preliminary high-level architectural framework for a service designed to enable users to perform periodic validation checks on their data tables. This architecture assumes the use of a batch ETL service similar to Airflow. This service is responsible for storing Python scripts for batch jobs, executing these jobs as per scheduled intervals, maintaining the status and historical data of these jobs, and determining the truth value of audit conditions. Users interact with this system through a user interface (UI) that communicates with a backend to manage these processes:
- Users interact with the batch ETL service through the UI to manage (create, read, update, delete) auditing jobs and to check their statuses and historical data.
- Unlike the batch ETL service, the alerting functionalities are handled separately. This service does not possess API endpoints to initiate alerts or view alert histories directly. Instead, users access this information through the alerting service facilitated by the UI and backend.
Request flows
Process Flow for Creating a Batch Auditing Job:
- When a user requests to create a batch auditing job, the backend service generates the
validation.py
script by inserting user-defined values into a predefined template. This lightweight script is held in memory on each backend service host. - The backend then forwards this script to the batch ETL service, which sets up the audit job and securely stores the script. Upon successful creation, it sends an HTTP 200 status code back to the backend service as confirmation.
Service Overview:
The batch auditing framework essentially acts as a front for the shared Batch ETL Service. It configures jobs based on parameters like ownership, scheduling (cron expressions), and specific database queries from supported types such as Hive, Trino, Spark, and SQL. The system maintains a primary SQL database that holds configurations for these audit jobs in a job_config
table, with an associated owner
table linking jobs to their respective owners.
Given the diverse nature of validation queries that support various SQL-like dialects, the batch ETL service interfaces with multiple shared database systems. In events of job failures or audit discrepancies, the service communicates with a centralized alerting system to notify concerned parties. Enhanced security is achieved through integration with an OpenID Connect service, ensuring authenticated interactions across the system.
Running and Monitoring Audit Jobs:
Audit jobs operate on a predefined schedule and consist of two main actions:
- Execution of the database query.
- Evaluation based on the query's results.
The creation of these jobs leverages a Python script within an Airflow service, facilitated by a script template that adapts to specific job requirements such as interval, query specifics, and conditional logic.
Detailed component design
Despite our best efforts to prevent invalid data from being stored, it is inevitable that some errors will occur. Auditing acts as an additional layer of validation checks.
A typical application for batch auditing is to verify large files, often larger than 1 GB, which originate outside our organization and over which we have no creation control. Single hosts processing and validating each row of such large files can be inefficient. Storing the data in a MySQL database and using the LOAD DATA
command, which is more efficient than INSERT
, followed by SELECT
statements for auditing, provides a more rapid and manageable solution. If the data is stored on a distributed file system like HDFS, NoSQL solutions like Hive or Spark can be employed for fast parallel processing.
Even when audits reveal invalid data, there may be a decision to retain this "dirty" data rather than having no data at all. Batch auditing is particularly adept at identifying specific issues such as duplicate entries or missing data. Some validation processes, like anomaly detection algorithms, rely on previously ingested data to identify inconsistencies in newly ingested data.
Defining a Validation with a Conditional Statement on a SQL Query’s Result
A table consists of rows and columns, with any specific location within referred to as a cell, element, datapoint, or value. These terms are used interchangeably here.
Let's explore how manually defined validations are implemented using comparison operators on SQL query results. The outcome of an SQL query is treated as a two-dimensional array, termed "result," upon which we can apply conditional statements. We review examples of daily validations, analyzing only the data from the previous day. These validations, demonstrated through SQL queries, highlight the application of conditional statements on query results.
For instance, validations can be defined for:
- Individual datapoints of a column: A simple validation might be ensuring that the "latest timestamp is less than 24 hours old".
SELECT COUNT(*) AS cnt FROM Transactions WHERE Date(timestamp) >= Curdate() - INTERVAL 1 DAY
Possible true conditional statements are result[0][0] > 0
and result['cnt'][0] > 0
.
Multiple datapoints of a column: If a user should not make more than five purchases per day, a validation can alert if this limit is exceeded.
SELECT user_id, count(*) AS cnt FROM Transactions WHERE Date(timestamp) = Curdate() - INTERVAL 1 DAY GROUP BY user_id
The conditional statement would be result.length <= 5
.
Multiple columns in a single row: Validating that sales using a specific coupon code do not exceed 100 per day.
SELECT count(*) AS cnt FROM Transactions WHERE Date(timestamp) = Curdate() - INTERVAL 1 DAY AND coupon_code = @coupon_code
The conditional statement would be result.length <= 100
.
Multiple tables: For verifying that all sales in North America have appropriate country codes.
SELECT * FROM sales_na S JOIN country_codes C ON S.country_code = C.id WHERE C.region != 'NA';
The conditional statement would be result.length == 0
.
Conditional statements on multiple queries: Raising an alert if the number of sales changes significantly from one week to the next.
SELECT COUNT(*) FROM sales WHERE Date(timestamp) = Curdate();
SELECT COUNT(*) FROM sales WHERE Date(timestamp) = Curdate() - INTERVAL 7 DAY;
The conditional statement could be Math.abs(result[0][0][0] – result[1][0][0]) / result[0][0][0] < 0.1
.
These examples demonstrate the flexibility and necessity of incorporating manually defined validations in maintaining data integrity. These principles can be adapted to other query languages like HiveQL, Trino, or Spark, and even general-purpose programming languages for broader application in database management systems.
Simple SQL Batch Auditing Service
Let us delve into developing a straightforward script for auditing a SQL table and subsequently outline the steps to create a batch auditing job using this script.
An Audit Script
The fundamental structure of a batch auditing job involves the following steps:
- Execute a database query.
- Capture the query result into a variable.
- Evaluate the variable against predefined conditions to determine compliance.
Here is a Python script example that performs a MySQL query to verify if the most recent timestamp in our transactions table is less than 24 hours old, and then outputs the result:
import mysql.connector
# Establish database connection
cnx = mysql.connector.connect(user='admin', password='password', host='127.0.0.1', database='transactions')
cursor = cnx.cursor()
# Define and execute the query
query = """
SELECT COUNT(*) AS cnt
FROM Transactions
WHERE Date(timestamp) >= Curdate() - INTERVAL 1 DAY
"""
cursor.execute(query)
results = cursor.fetchall()
# Close connections
cursor.close()
cnx.close()
# Check the condition and print the result
print(results[0][0] > 0) # A simple condition to check the count
In scenarios requiring multiple database queries, the script can be expanded to handle various queries and compare their results, enhancing the robustness of the audit process.
An Audit Service
To scale this script into a full-fledged batch auditing service, we can generalize the script to enable user-specific configurations such as SQL databases, queries, and evaluation conditions. The approach involves creating a Python template file, named validation.py.template
, which can dynamically insert user-defined parameters and execute within a batch processing framework like Airflow or a simple cron job.
Here’s a streamlined version of how such a service might be structured using Airflow:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
import mysql.connector
import os
import pdpyras
# Define the validation function
def validation():
# Establish connection and execute queries
results = []
queries = [{'query': "..."}] # Example placeholder for queries
for query in queries:
cnx = mysql.connector.connect(user='admin', password='password', host='127.0.0.1', database='transactions')
cursor = cnx.cursor()
cursor.execute(query['query'])
results.append(cursor.fetchall())
cursor.close()
cnx.close()
# Push results to Airflow XCom for cross-task communication
ti.xcom_push(key='validation_result', value=results)
# Define the alerting function
def alert():
# Retrieve results from previous task
result = ti.xcom_pull(key='validation_result')
if result:
# Trigger an alert if the audit fails
routing_key = os.environ['PD_API_KEY']
session = pdpyras.EventsAPISession(routing_key)
session.trigger("Validation failed", "audit")
# Set up the Airflow DAG
dag = DAG(
'sql_audit_service',
default_args={
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
},
description='A DAG for SQL batch auditing',
schedule_interval=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False
)
t1 = PythonOperator(
task_id='run_validation',
python_callable=validation,
dag=dag
)
t2 = PythonOperator(
task_id='trigger_alert',
python_callable=alert,
dag=dag
)
t1 >> t2 # Define task dependency
Addressing Scalability and Alert Management:
The primary challenges in scaling involve the batch ETL and alerting services. Discussions on scalability focus on optimizing these components to handle an increasing load without performance degradation.
Audits are logged and results stored in SQL, with monitoring to track job performance and trigger alerts on failures. Decision-making around alert management considers whether the batch ETL service or the backend should handle alert initiation, with a leaning towards centralizing this responsibility within the backend to avoid mismanagement and reduce maintenance complexity.
Trade offs/Tech choices
Limit Query Execution Time
To curtail the cost associated with long-running queries, we set execution time limits—10 minutes for query creation or editing and 15 minutes for operational execution. This ensures users design efficient queries and helps instill discipline in managing query performance. For enhanced user interaction, we offer an asynchronous validation process where the system executes the query and notifies the user within 10 minutes whether the query configuration was accepted or rejected. This method might, however, make users hesitant to adjust their queries, potentially overlooking necessary optimizations.
To avoid conflicts from concurrent edits, we can use the following mechanism. Should a query exceed 15 minutes, it will automatically terminate, the job will be suspended until revalidated, and a high-priority alert will be sent to the owner. Queries exceeding 10 minutes will prompt a warning to the owner about potential future overruns.
Utilization of Database Schema Metadata
Our service leverages schema metadata to auto-generate job configurations, suggesting partition columns for WHERE filters and allowing users to focus queries on the latest partitions. If a partition passes an audit, further audits are not scheduled unless manually requested, accommodating scenarios where audits may need to be repeated due to overlooked issues. Additionally, we propose the implementation of a metadata platform to record and alert on table-specific incidents, enhancing overall data integrity monitoring.
Failure scenarios/bottlenecks
Pre-Submission Query Checks
Our system provides immediate feedback on query configurations as they are being written, helping prevent the submission of inefficient or problematic queries. This includes prohibiting full table scans, mandating filters on partition keys, and limiting the number of partition key values within a query. Expensive JOIN operations are also restricted. Once a query is defined, the system displays the query execution plan, allowing users to fine-tune their queries, supported by resources on query optimization specific to our database languages.
Auditing Data Pipelines
Our system, manages data pipelines and their interdependencies. If an audit job on a specific table fails, it not only disables that job but also any dependent downstream tasks, preventing the propagation of erroneous data. This integrated approach enhances the robustness of data-driven operations across the organization.
Managing Simultaneous Queries
We set limits on the number of simultaneous queries the batch ETL service can execute to prevent system overloads. The backend monitors the scheduled queries and alerts developers if the system approaches its capacity. Monitoring also extends to the wait times for query execution, with alerts issued if these exceed set thresholds, helping manage system load effectively.
Future improvements
To further refine our auditing capabilities, we can integrate features that allow dynamic adjustments to audits based on specific triggers, such as changes in data volume or detected anomalies. This proactive approach ensures our auditing processes remain adaptive and aligned with operational needs.
By implementing these strategies, we aim to maintain a high-performance, cost-efficient service that supports the dynamic needs of our users and organizational requirements.