Data Quality Test Suite
Data pipelines silently produce wrong numbers more often than they fail loudly. This template generates a comprehensive test suite that catches:
- Schema drift (a column type changed)
- Freshness issues (data is stale)
- Volume anomalies (record count dropped 50%)
- Quality violations (nulls, duplicates, invalid values)
- Business rule violations (orders without customers, sums that don't reconcile)
Pipelines without these tests will eventually ship bad data to consumers. The question is just when.
When to use
- Before deploying any new pipeline to production
- When inheriting pipelines without tests (yes, fix this immediately)
- After data quality incidents (formalize what you should have caught)
- For regulated data where SLAs require quality SLOs
Choice of tooling
Three viable approaches; pick one and stick with it:
1. Great Expectations — most comprehensive, Python-based, lots of built-in expectations, integrates with ADF/Databricks/Fabric. Best for: complex projects with many sources.
2. dbt tests — if using dbt for transformations, tests are co-located with models. SQL-native. Best for: dbt-centric stacks (less common in pure ADF/Fabric).
3. Native PySpark assertions + custom framework — lowest dependency, full control, but requires building. Best for: teams already deep in PySpark.
For Azure / ADF / Fabric / PySpark stack, recommend Great Expectations OR PySpark-native assertions in a shared library. This template covers both.
Prompt
You are a senior data quality engineer generating a comprehensive test
suite for a data pipeline. Generate tests across all relevant categories.
## Input
**Target table:** {{target_table}}
**Business context:** {{business_context}}
**Schema / sample:** {{schema_or_sample}}
**Known constraints:** {{known_constraints}}
## Output
Generate tests across these 8 categories. Show both Great Expectations and
PySpark-native syntax for portability.
### 1. Schema validation
Verify the table has expected columns with expected types. Catches schema
drift from upstream sources.
#### Great Expectations:
```python
from great_expectations.expectations import (
ExpectTableColumnsToMatchOrderedList,
ExpectColumnValuesToBeOfType,
)
expectations = [
ExpectTableColumnsToMatchOrderedList(
column_list=[
"customer_id", "customer_name", "email",
"created_at", "_silver_load_timestamp", "_source_system"
]
),
ExpectColumnValuesToBeOfType(column="customer_id", type_="StringType"),
ExpectColumnValuesToBeOfType(column="customer_name", type_="StringType"),
ExpectColumnValuesToBeOfType(column="email", type_="StringType"),
ExpectColumnValuesToBeOfType(column="created_at", type_="TimestampType"),
]
```
#### PySpark-native:
```python
def validate_schema(df, expected_schema):
"""Compare actual schema to expected; raise on drift."""
actual_fields = {f.name: f.dataType.typeName() for f in df.schema.fields}
expected_fields = {f.name: f.dataType.typeName() for f in expected_schema.fields}
missing = set(expected_fields) - set(actual_fields)
extra = set(actual_fields) - set(expected_fields)
type_mismatches = {
f: (expected_fields[f], actual_fields[f])
for f in expected_fields & actual_fields
if expected_fields[f] != actual_fields[f]
}
if missing or type_mismatches:
raise SchemaValidationError(
f"Missing columns: {missing}, "
f"Type mismatches: {type_mismatches}, "
f"Unexpected new columns: {extra}"
)
if extra:
logger.warning(f"New columns appeared (allowed but flagged): {extra}")
```
### 2. Freshness checks
Verify data is recent enough. Catches stale pipelines.
```python
# Great Expectations
ExpectColumnMaxToBeBetween(
column="_silver_load_timestamp",
min_value=datetime.utcnow() - timedelta(hours=24),
max_value=datetime.utcnow() + timedelta(minutes=5),
)
# PySpark-native
max_load_time = df.agg(F.max("_silver_load_timestamp")).collect()[0][0]
hours_old = (datetime.utcnow() - max_load_time).total_seconds() / 3600
if hours_old > 24:
raise FreshnessError(f"Data is {hours_old:.1f} hours old; threshold 24h")
```
For incremental tables, also check the data window:
```python
# Most recent business date in the data should be today (or yesterday for next-day arrival)
max_business_date = df.agg(F.max("order_date")).collect()[0][0]
expected_date = (datetime.utcnow() - timedelta(days=1)).date()
if max_business_date < expected_date:
raise FreshnessError(
f"Latest order_date is {max_business_date}; expected at least {expected_date}"
)
```
### 3. Volume / row count anomaly detection
Compare today's volume to historical baseline. Catches partial loads or
runaway loads.
```python
def check_row_count_anomaly(df, table_name, threshold_pct=50):
"""Compare current row count to recent baseline."""
current_count = df.count()
# Get average count over last 7 days
baseline_query = f"""
SELECT AVG(row_count) AS avg_count
FROM meta.pipeline_run_log
WHERE notebook = '{table_name}'
AND status = 'success'
AND end_time > current_timestamp() - INTERVAL 7 DAYS
"""
baseline = spark.sql(baseline_query).collect()[0][0] or 0
if baseline == 0:
logger.info("No baseline yet; skipping volume check")
return
deviation_pct = abs(current_count - baseline) / baseline * 100
if deviation_pct > threshold_pct:
if current_count < baseline * 0.5:
# Major drop; likely partial load
raise VolumeAnomalyError(
f"Row count dropped {deviation_pct:.1f}% from baseline "
f"({baseline:.0f} → {current_count})"
)
elif current_count > baseline * 2:
# Major spike; likely duplicate or wrong filter
raise VolumeAnomalyError(
f"Row count spiked {deviation_pct:.1f}% from baseline "
f"({baseline:.0f} → {current_count})"
)
```
### 4. Null and missingness checks
For columns that should never be null:
```python
# Great Expectations
ExpectColumnValuesToNotBeNull(column="customer_id"),
ExpectColumnValuesToNotBeNull(column="created_at"),
# PySpark-native
not_null_columns = ["customer_id", "created_at"]
for col_name in not_null_columns:
null_count = df.filter(F.col(col_name).isNull()).count()
if null_count > 0:
raise NullViolationError(
f"Column {col_name} has {null_count} NULL values; expected 0"
)
```
For columns where some nulls are expected, check the NULL rate:
```python
# Tolerable null rate check (e.g., email is sometimes missing — acceptable up to 10%)
total = df.count()
null_count = df.filter(F.col("email").isNull()).count()
null_pct = (null_count / total * 100) if total > 0 else 0
if null_pct > 10:
raise NullRateViolation(
f"Column email is {null_pct:.1f}% NULL; threshold 10%"
)
```
### 5. Uniqueness / duplicate checks
For columns that should be unique (PKs, business keys):
```python
# Great Expectations
ExpectColumnValuesToBeUnique(column="customer_id"),
ExpectCompoundColumnsToBeUnique(column_list=["customer_id", "valid_from"]),
# PySpark-native
def check_unique(df, columns):
duplicates = (df.groupBy(columns).count()
.filter(F.col("count") > 1)
.count())
if duplicates > 0:
raise UniquenessViolation(
f"Found {duplicates} duplicate values in {columns}"
)
check_unique(df, ["customer_id"])
check_unique(df, ["customer_id", "valid_from"]) # composite key
```
### 6. Range and value checks
For columns with known valid ranges or enum sets:
```python
# Numeric range
ExpectColumnValuesToBeBetween(column="order_amount", min_value=0, max_value=1_000_000)
ExpectColumnValuesToBeBetween(column="customer_age", min_value=18, max_value=120, mostly=0.99)
# Enum set
ExpectColumnValuesToBeInSet(
column="order_status",
value_set=["pending", "approved", "shipped", "delivered", "cancelled"]
)
# Pattern matching (regex)
ExpectColumnValuesToMatchRegex(
column="email",
regex=r"^[^@\s]+@[^@\s]+\.[^@\s]+$",
mostly=0.95 # tolerate 5% bad emails
)
# PySpark-native
invalid_status = df.filter(
~F.col("order_status").isin(["pending", "approved", "shipped", "delivered", "cancelled"])
).count()
if invalid_status > 0:
raise ValueViolation(f"Found {invalid_status} rows with invalid order_status")
# Range check
out_of_range = df.filter(
(F.col("order_amount") < 0) | (F.col("order_amount") > 1_000_000)
).count()
if out_of_range > 0:
raise RangeViolation(f"Found {out_of_range} rows with order_amount out of range")
```
### 7. Referential integrity
For Silver/Gold tables that join to dimensions:
```python
# Every order must reference an existing customer
def check_referential_integrity(fact_df, dim_df, fact_key, dim_key):
orphans = (fact_df
.join(dim_df, fact_df[fact_key] == dim_df[dim_key], "left_anti")
.count()
)
if orphans > 0:
raise ReferentialIntegrityViolation(
f"Found {orphans} rows in fact with no matching dim "
f"({fact_key} → {dim_key})"
)
check_referential_integrity(orders_df, customers_df, "customer_id", "customer_id")
```
For Type 2 SCD tables, validate that exactly one record is current per
business key:
```python
# Each customer should have exactly one current record (is_current = true)
multiple_current = (df
.filter(F.col("is_current") == True)
.groupBy("customer_id")
.count()
.filter(F.col("count") != 1)
.count()
)
if multiple_current > 0:
raise ScdViolation(f"Found {multiple_current} customers with !=1 current records")
```
### 8. Business invariants
The most important — and most pipeline-specific — checks. Examples:
```python
# Sum of order line items = order total
def check_order_line_total_invariant(orders_df, lines_df):
aggregated_lines = (lines_df
.groupBy("order_id")
.agg(F.sum("line_amount").alias("calculated_total"))
)
invariant_check = (orders_df
.join(aggregated_lines, "order_id")
.filter(F.abs(F.col("order_total") - F.col("calculated_total")) > 0.01) # tolerance for rounding
)
violations = invariant_check.count()
if violations > 0:
raise BusinessInvariantViolation(
f"Found {violations} orders where order_total != sum(line_amount)"
)
# Reconciliation between layers
def reconcile_silver_to_gold():
silver_count = spark.sql("SELECT COUNT(*) FROM silver.customer WHERE is_current = true").collect()[0][0]
gold_count = spark.sql("SELECT COUNT(*) FROM gold.dim_customer").collect()[0][0]
if silver_count != gold_count:
raise ReconciliationError(
f"Silver active count ({silver_count}) != Gold dim count ({gold_count})"
)
# Sum reconciliation across systems (after migration)
def reconcile_total_revenue(period):
legacy_total = read_legacy_revenue(period)
new_total = read_new_revenue(period)
diff = abs(legacy_total - new_total)
diff_pct = diff / legacy_total * 100 if legacy_total > 0 else 0
if diff_pct > 0.01: # tolerance: 1 cent per $10K
raise ReconciliationError(
f"Revenue reconciliation off by {diff} ({diff_pct:.4f}%) for {period}"
)
```
### 9. Test runner integration
Tests must run as part of the pipeline, not just on demand:
#### Inline in PySpark notebook
```python
def run_data_quality_checks(df, layer, table):
"""Called at the end of every notebook before exiting."""
results = []
try:
validate_schema(df, expected_schemas[table])
results.append(("schema", "passed"))
except Exception as e:
results.append(("schema", "failed", str(e)))
raise # fail the notebook
try:
check_unique(df, primary_keys[table])
results.append(("uniqueness", "passed"))
except Exception as e:
results.append(("uniqueness", "failed", str(e)))
raise
# ... more checks
log_dq_results(results, layer, table)
return results
```
#### As a separate notebook
Wrap all DQ checks in a dedicated `NB_DQ_<table>.py` notebook, called by ADF after the transformation notebook succeeds. Allows DQ to fail independently from transform.
### 10. Quarantine pattern (for bad data)
Sometimes you don't want to fail the pipeline; you want to isolate bad rows:
```python
def split_clean_and_bad(df, validation_logic):
"""Split DataFrame into clean and bad based on validation."""
clean_df = df.filter(validation_logic)
bad_df = df.filter(~validation_logic)
return clean_df, bad_df
# Validation: order_amount must be > 0
clean_orders, bad_orders = split_clean_and_bad(
orders_df,
F.col("order_amount") > 0
)
# Write clean orders to Silver
clean_orders.write.mode("append").saveAsTable("silver.orders")
# Write bad orders to quarantine for inspection
(bad_orders
.withColumn("_quarantine_reason", F.lit("order_amount <= 0"))
.withColumn("_quarantined_at", F.current_timestamp())
.write.mode("append").saveAsTable("quarantine.orders"))
if bad_orders.count() > 0:
logger.warning(f"Quarantined {bad_orders.count()} orders with invalid amount")
# Alert but don't fail the pipeline
send_alert(f"Orders quarantined: {bad_orders.count()}")
```
Pipeline continues; quarantined data gets reviewed separately.
### 11. Data quality reporting
Build a dashboard showing:
- DQ checks pass/fail rate (last 7/30 days)
- Trending DQ violations
- Quarantined row counts over time
- Mean time to detect / resolve DQ issues
This dashboard tells you whether DQ is improving or degrading.
### 12. SLO definition
For critical tables, define SLOs:
```yaml
table: silver.customer
slos:
freshness:
target: <= 6 hours from source change
measurement: max(_silver_load_timestamp) vs source CDC timestamp
completeness:
target: 99.9% non-null on customer_id
measurement: count(NULL customer_id) / count(*)
accuracy:
target: 100% reconciliation with source row count weekly
measurement: comparison query against source
```
When SLOs are violated, that's a P1/P2 ticket; fix it.
## Output structure
For each pipeline / table:
1. **`tests/dq/<table>_expectations.py`** — DQ checks for that table
2. **`tests/dq/run_dq.py`** — runner integration
3. **`docs/dq/<table>_slos.yaml`** — defined SLOs
4. **Sample alert / notification configuration**
## Style
- Specific tests for the actual table provided
- Cover all 8 categories explicitly
- Both GE and PySpark-native shown for portability
- Honest about which checks should fail-the-pipeline vs warn-and-quarantine
- Specific tolerance levels (1% null rate, 0.01 reconciliation tolerance)Tips
- Run DQ tests in CI/CD too. Not just at runtime — when someone changes a transformation, run sample data through the tests.
- Start strict; loosen if needed. Tolerating 5% null is a deliberate choice; defaulting to it is sloppy.
- Quarantine, don't fail, for non-critical issues. A pipeline failing because 1 row out of 10M has a bad email is over-strict.
- Reconcile after every migration. Pre vs post migration counts and sums tell you if the pipeline preserved data.
- Trend the DQ metrics. Are violations increasing? That's a signal something's drifting upstream.
Common mistakes to avoid
- No DQ tests at all (most common)
- DQ tests that just check counts (misses field-level corruption)
- Tests that always pass (broken tests give false confidence)
- Checks without thresholds (binary pass/fail when 1% violation should warn, 10% should fail)
- No alerting on failures (silent failure)
- DQ run separately from pipeline (drifts out of sync)
- No quarantine pattern (everything either fails the pipeline or gets ignored)
- No SLOs (no shared understanding of "good")