Playbook
TemplateTestE2e TestingFeatured

Data Quality Test Suite

Generate comprehensive data quality tests for ETL pipelines: schema validation, freshness checks, null/duplicate/range checks, and business invariants.

Data Quality Test Suite

Data pipelines silently produce wrong numbers more often than they fail loudly. This template generates a comprehensive test suite that catches:

  • Schema drift (a column type changed)
  • Freshness issues (data is stale)
  • Volume anomalies (record count dropped 50%)
  • Quality violations (nulls, duplicates, invalid values)
  • Business rule violations (orders without customers, sums that don't reconcile)

Pipelines without these tests will eventually ship bad data to consumers. The question is just when.

When to use

  • Before deploying any new pipeline to production
  • When inheriting pipelines without tests (yes, fix this immediately)
  • After data quality incidents (formalize what you should have caught)
  • For regulated data where SLAs require quality SLOs

Choice of tooling

Three viable approaches; pick one and stick with it:

1. Great Expectations — most comprehensive, Python-based, lots of built-in expectations, integrates with ADF/Databricks/Fabric. Best for: complex projects with many sources.

2. dbt tests — if using dbt for transformations, tests are co-located with models. SQL-native. Best for: dbt-centric stacks (less common in pure ADF/Fabric).

3. Native PySpark assertions + custom framework — lowest dependency, full control, but requires building. Best for: teams already deep in PySpark.

For Azure / ADF / Fabric / PySpark stack, recommend Great Expectations OR PySpark-native assertions in a shared library. This template covers both.

Prompt

You are a senior data quality engineer generating a comprehensive test
suite for a data pipeline. Generate tests across all relevant categories.

## Input

**Target table:** {{target_table}}
**Business context:** {{business_context}}
**Schema / sample:** {{schema_or_sample}}
**Known constraints:** {{known_constraints}}

## Output

Generate tests across these 8 categories. Show both Great Expectations and
PySpark-native syntax for portability.

### 1. Schema validation

Verify the table has expected columns with expected types. Catches schema
drift from upstream sources.

#### Great Expectations:
```python
from great_expectations.expectations import (
    ExpectTableColumnsToMatchOrderedList,
    ExpectColumnValuesToBeOfType,
)

expectations = [
    ExpectTableColumnsToMatchOrderedList(
        column_list=[
            "customer_id", "customer_name", "email",
            "created_at", "_silver_load_timestamp", "_source_system"
        ]
    ),
    ExpectColumnValuesToBeOfType(column="customer_id", type_="StringType"),
    ExpectColumnValuesToBeOfType(column="customer_name", type_="StringType"),
    ExpectColumnValuesToBeOfType(column="email", type_="StringType"),
    ExpectColumnValuesToBeOfType(column="created_at", type_="TimestampType"),
]
```

#### PySpark-native:
```python
def validate_schema(df, expected_schema):
    """Compare actual schema to expected; raise on drift."""
    actual_fields = {f.name: f.dataType.typeName() for f in df.schema.fields}
    expected_fields = {f.name: f.dataType.typeName() for f in expected_schema.fields}
    
    missing = set(expected_fields) - set(actual_fields)
    extra = set(actual_fields) - set(expected_fields)
    type_mismatches = {
        f: (expected_fields[f], actual_fields[f])
        for f in expected_fields & actual_fields
        if expected_fields[f] != actual_fields[f]
    }
    
    if missing or type_mismatches:
        raise SchemaValidationError(
            f"Missing columns: {missing}, "
            f"Type mismatches: {type_mismatches}, "
            f"Unexpected new columns: {extra}"
        )
    
    if extra:
        logger.warning(f"New columns appeared (allowed but flagged): {extra}")
```

### 2. Freshness checks

Verify data is recent enough. Catches stale pipelines.

```python
# Great Expectations
ExpectColumnMaxToBeBetween(
    column="_silver_load_timestamp",
    min_value=datetime.utcnow() - timedelta(hours=24),
    max_value=datetime.utcnow() + timedelta(minutes=5),
)

# PySpark-native
max_load_time = df.agg(F.max("_silver_load_timestamp")).collect()[0][0]
hours_old = (datetime.utcnow() - max_load_time).total_seconds() / 3600
if hours_old > 24:
    raise FreshnessError(f"Data is {hours_old:.1f} hours old; threshold 24h")
```

For incremental tables, also check the data window:
```python
# Most recent business date in the data should be today (or yesterday for next-day arrival)
max_business_date = df.agg(F.max("order_date")).collect()[0][0]
expected_date = (datetime.utcnow() - timedelta(days=1)).date()
if max_business_date < expected_date:
    raise FreshnessError(
        f"Latest order_date is {max_business_date}; expected at least {expected_date}"
    )
```

### 3. Volume / row count anomaly detection

Compare today's volume to historical baseline. Catches partial loads or
runaway loads.

```python
def check_row_count_anomaly(df, table_name, threshold_pct=50):
    """Compare current row count to recent baseline."""
    current_count = df.count()
    
    # Get average count over last 7 days
    baseline_query = f"""
        SELECT AVG(row_count) AS avg_count
        FROM meta.pipeline_run_log
        WHERE notebook = '{table_name}'
          AND status = 'success'
          AND end_time > current_timestamp() - INTERVAL 7 DAYS
    """
    baseline = spark.sql(baseline_query).collect()[0][0] or 0
    
    if baseline == 0:
        logger.info("No baseline yet; skipping volume check")
        return
    
    deviation_pct = abs(current_count - baseline) / baseline * 100
    
    if deviation_pct > threshold_pct:
        if current_count < baseline * 0.5:
            # Major drop; likely partial load
            raise VolumeAnomalyError(
                f"Row count dropped {deviation_pct:.1f}% from baseline "
                f"({baseline:.0f} → {current_count})"
            )
        elif current_count > baseline * 2:
            # Major spike; likely duplicate or wrong filter
            raise VolumeAnomalyError(
                f"Row count spiked {deviation_pct:.1f}% from baseline "
                f"({baseline:.0f} → {current_count})"
            )
```

### 4. Null and missingness checks

For columns that should never be null:

```python
# Great Expectations
ExpectColumnValuesToNotBeNull(column="customer_id"),
ExpectColumnValuesToNotBeNull(column="created_at"),

# PySpark-native
not_null_columns = ["customer_id", "created_at"]
for col_name in not_null_columns:
    null_count = df.filter(F.col(col_name).isNull()).count()
    if null_count > 0:
        raise NullViolationError(
            f"Column {col_name} has {null_count} NULL values; expected 0"
        )
```

For columns where some nulls are expected, check the NULL rate:
```python
# Tolerable null rate check (e.g., email is sometimes missing — acceptable up to 10%)
total = df.count()
null_count = df.filter(F.col("email").isNull()).count()
null_pct = (null_count / total * 100) if total > 0 else 0
if null_pct > 10:
    raise NullRateViolation(
        f"Column email is {null_pct:.1f}% NULL; threshold 10%"
    )
```

### 5. Uniqueness / duplicate checks

For columns that should be unique (PKs, business keys):

```python
# Great Expectations
ExpectColumnValuesToBeUnique(column="customer_id"),
ExpectCompoundColumnsToBeUnique(column_list=["customer_id", "valid_from"]),

# PySpark-native
def check_unique(df, columns):
    duplicates = (df.groupBy(columns).count()
                    .filter(F.col("count") > 1)
                    .count())
    if duplicates > 0:
        raise UniquenessViolation(
            f"Found {duplicates} duplicate values in {columns}"
        )

check_unique(df, ["customer_id"])
check_unique(df, ["customer_id", "valid_from"])  # composite key
```

### 6. Range and value checks

For columns with known valid ranges or enum sets:

```python
# Numeric range
ExpectColumnValuesToBeBetween(column="order_amount", min_value=0, max_value=1_000_000)
ExpectColumnValuesToBeBetween(column="customer_age", min_value=18, max_value=120, mostly=0.99)

# Enum set
ExpectColumnValuesToBeInSet(
    column="order_status",
    value_set=["pending", "approved", "shipped", "delivered", "cancelled"]
)

# Pattern matching (regex)
ExpectColumnValuesToMatchRegex(
    column="email",
    regex=r"^[^@\s]+@[^@\s]+\.[^@\s]+$",
    mostly=0.95  # tolerate 5% bad emails
)

# PySpark-native
invalid_status = df.filter(
    ~F.col("order_status").isin(["pending", "approved", "shipped", "delivered", "cancelled"])
).count()
if invalid_status > 0:
    raise ValueViolation(f"Found {invalid_status} rows with invalid order_status")

# Range check
out_of_range = df.filter(
    (F.col("order_amount") < 0) | (F.col("order_amount") > 1_000_000)
).count()
if out_of_range > 0:
    raise RangeViolation(f"Found {out_of_range} rows with order_amount out of range")
```

### 7. Referential integrity

For Silver/Gold tables that join to dimensions:

```python
# Every order must reference an existing customer
def check_referential_integrity(fact_df, dim_df, fact_key, dim_key):
    orphans = (fact_df
        .join(dim_df, fact_df[fact_key] == dim_df[dim_key], "left_anti")
        .count()
    )
    if orphans > 0:
        raise ReferentialIntegrityViolation(
            f"Found {orphans} rows in fact with no matching dim "
            f"({fact_key} → {dim_key})"
        )

check_referential_integrity(orders_df, customers_df, "customer_id", "customer_id")
```

For Type 2 SCD tables, validate that exactly one record is current per
business key:
```python
# Each customer should have exactly one current record (is_current = true)
multiple_current = (df
    .filter(F.col("is_current") == True)
    .groupBy("customer_id")
    .count()
    .filter(F.col("count") != 1)
    .count()
)
if multiple_current > 0:
    raise ScdViolation(f"Found {multiple_current} customers with !=1 current records")
```

### 8. Business invariants

The most important — and most pipeline-specific — checks. Examples:

```python
# Sum of order line items = order total
def check_order_line_total_invariant(orders_df, lines_df):
    aggregated_lines = (lines_df
        .groupBy("order_id")
        .agg(F.sum("line_amount").alias("calculated_total"))
    )
    
    invariant_check = (orders_df
        .join(aggregated_lines, "order_id")
        .filter(F.abs(F.col("order_total") - F.col("calculated_total")) > 0.01)  # tolerance for rounding
    )
    
    violations = invariant_check.count()
    if violations > 0:
        raise BusinessInvariantViolation(
            f"Found {violations} orders where order_total != sum(line_amount)"
        )

# Reconciliation between layers
def reconcile_silver_to_gold():
    silver_count = spark.sql("SELECT COUNT(*) FROM silver.customer WHERE is_current = true").collect()[0][0]
    gold_count = spark.sql("SELECT COUNT(*) FROM gold.dim_customer").collect()[0][0]
    
    if silver_count != gold_count:
        raise ReconciliationError(
            f"Silver active count ({silver_count}) != Gold dim count ({gold_count})"
        )

# Sum reconciliation across systems (after migration)
def reconcile_total_revenue(period):
    legacy_total = read_legacy_revenue(period)
    new_total = read_new_revenue(period)
    
    diff = abs(legacy_total - new_total)
    diff_pct = diff / legacy_total * 100 if legacy_total > 0 else 0
    
    if diff_pct > 0.01:  # tolerance: 1 cent per $10K
        raise ReconciliationError(
            f"Revenue reconciliation off by {diff} ({diff_pct:.4f}%) for {period}"
        )
```

### 9. Test runner integration

Tests must run as part of the pipeline, not just on demand:

#### Inline in PySpark notebook
```python
def run_data_quality_checks(df, layer, table):
    """Called at the end of every notebook before exiting."""
    results = []
    
    try:
        validate_schema(df, expected_schemas[table])
        results.append(("schema", "passed"))
    except Exception as e:
        results.append(("schema", "failed", str(e)))
        raise  # fail the notebook
    
    try:
        check_unique(df, primary_keys[table])
        results.append(("uniqueness", "passed"))
    except Exception as e:
        results.append(("uniqueness", "failed", str(e)))
        raise
    
    # ... more checks
    
    log_dq_results(results, layer, table)
    return results
```

#### As a separate notebook
Wrap all DQ checks in a dedicated `NB_DQ_<table>.py` notebook, called by ADF after the transformation notebook succeeds. Allows DQ to fail independently from transform.

### 10. Quarantine pattern (for bad data)

Sometimes you don't want to fail the pipeline; you want to isolate bad rows:

```python
def split_clean_and_bad(df, validation_logic):
    """Split DataFrame into clean and bad based on validation."""
    clean_df = df.filter(validation_logic)
    bad_df = df.filter(~validation_logic)
    return clean_df, bad_df

# Validation: order_amount must be > 0
clean_orders, bad_orders = split_clean_and_bad(
    orders_df,
    F.col("order_amount") > 0
)

# Write clean orders to Silver
clean_orders.write.mode("append").saveAsTable("silver.orders")

# Write bad orders to quarantine for inspection
(bad_orders
    .withColumn("_quarantine_reason", F.lit("order_amount <= 0"))
    .withColumn("_quarantined_at", F.current_timestamp())
    .write.mode("append").saveAsTable("quarantine.orders"))

if bad_orders.count() > 0:
    logger.warning(f"Quarantined {bad_orders.count()} orders with invalid amount")
    # Alert but don't fail the pipeline
    send_alert(f"Orders quarantined: {bad_orders.count()}")
```

Pipeline continues; quarantined data gets reviewed separately.

### 11. Data quality reporting

Build a dashboard showing:
- DQ checks pass/fail rate (last 7/30 days)
- Trending DQ violations
- Quarantined row counts over time
- Mean time to detect / resolve DQ issues

This dashboard tells you whether DQ is improving or degrading.

### 12. SLO definition

For critical tables, define SLOs:

```yaml
table: silver.customer
slos:
  freshness:
    target: <= 6 hours from source change
    measurement: max(_silver_load_timestamp) vs source CDC timestamp
  completeness:
    target: 99.9% non-null on customer_id
    measurement: count(NULL customer_id) / count(*)
  accuracy:
    target: 100% reconciliation with source row count weekly
    measurement: comparison query against source
```

When SLOs are violated, that's a P1/P2 ticket; fix it.

## Output structure

For each pipeline / table:

1. **`tests/dq/<table>_expectations.py`** — DQ checks for that table
2. **`tests/dq/run_dq.py`** — runner integration
3. **`docs/dq/<table>_slos.yaml`** — defined SLOs
4. **Sample alert / notification configuration**

## Style

- Specific tests for the actual table provided
- Cover all 8 categories explicitly
- Both GE and PySpark-native shown for portability
- Honest about which checks should fail-the-pipeline vs warn-and-quarantine
- Specific tolerance levels (1% null rate, 0.01 reconciliation tolerance)

Tips

  • Run DQ tests in CI/CD too. Not just at runtime — when someone changes a transformation, run sample data through the tests.
  • Start strict; loosen if needed. Tolerating 5% null is a deliberate choice; defaulting to it is sloppy.
  • Quarantine, don't fail, for non-critical issues. A pipeline failing because 1 row out of 10M has a bad email is over-strict.
  • Reconcile after every migration. Pre vs post migration counts and sums tell you if the pipeline preserved data.
  • Trend the DQ metrics. Are violations increasing? That's a signal something's drifting upstream.

Common mistakes to avoid

  • No DQ tests at all (most common)
  • DQ tests that just check counts (misses field-level corruption)
  • Tests that always pass (broken tests give false confidence)
  • Checks without thresholds (binary pass/fail when 1% violation should warn, 10% should fail)
  • No alerting on failures (silent failure)
  • DQ run separately from pipeline (drifts out of sync)
  • No quarantine pattern (everything either fails the pipeline or gets ignored)
  • No SLOs (no shared understanding of "good")

Related assets

Command Palette

Search for a command to run...