Playbook

Data Validation Skill

Claude Code skill that compares old and new system outputs for parity — running validation queries on both DBs and reporting drift.

Data Validation Skill

Auto-triggers during data migration work to verify the new system has the same data as legacy. Runs counts, sums, distributions, and spot-checks across both databases, then surfaces drift in a way that's actionable.

When it triggers

  • "Verify data migration"
  • "Are the counts matching between old and new?"
  • "Validate the customers table migrated correctly"
  • "Check for drift in the orders table"
  • "Compare aggregate values between source and target DB"

Why a skill

Data validation is high-stakes and error-prone. Counting rows is easy; catching the case where 50 rows have wrong values is hard. This skill encodes the patterns that catch real bugs during migrations: aggregate sums, distribution comparisons, foreign key integrity, sample row diffs, time-windowed reconciliation.

Installation

  1. Copy this skill folder to ~/.claude/skills/data-validation/
  2. Restart Claude Code
  3. Try: "Validate that customers migrated correctly from SQL Server to Postgres"

SKILL.md content

---
name: data-validation
description: |
  Use this skill when the user wants to verify data parity between two
  databases during a migration. Triggers on: "validate migration",
  "check data parity", "compare old and new", "are counts matching",
  "verify backfill", "check for drift", "reconcile databases".

  Generates: validation queries, comparison scripts, drift reports.

  Do NOT use for: greenfield data validation (use schema constraints),
  backup verification (separate concern), or testing new code (use
  test-generator skill).
---

# Data Validation

You generate validation queries and comparison logic to verify data parity
between two databases during migration. You catch the bugs that simple
COUNT(*) won't catch.

## Validation hierarchy

Run validations in this order. Earlier checks are cheaper and rule out
broad issues; later checks catch subtle bugs.

### Level 1: Row counts

The dumbest, fastest check. If counts don't match, stop everything else.

```sql
-- Source (SQL Server)
SELECT COUNT(*) FROM customers;

-- Target (Postgres)
SELECT COUNT(*) FROM customers;

If counts differ:

  • Investigate WHERE the missing rows are (run with date ranges)
  • Check the migration log for skipped rows
  • Don't proceed to other checks until counts match

Level 2: Aggregate sums and stats

Catches data corruption that doesn't show up in counts:

-- Source
SELECT
  COUNT(*) AS total_count,
  SUM(amount) AS total_amount,
  AVG(amount) AS avg_amount,
  MIN(created_at) AS earliest,
  MAX(created_at) AS latest,
  COUNT(DISTINCT customer_id) AS unique_customers
FROM orders;

-- Target
SELECT
  COUNT(*) AS total_count,
  SUM(amount) AS total_amount,
  AVG(amount) AS avg_amount,
  MIN(created_at) AS earliest,
  MAX(created_at) AS latest,
  COUNT(DISTINCT customer_id) AS unique_customers
FROM orders;

Catches:

  • Currency precision drift (decimal → numeric translation issues)
  • Date range loss (rows from 1900 silently dropped)
  • Customer ID translation issues (UUID generation went wrong)

Level 3: Distribution comparisons

Group by enums / status fields and compare:

-- Source
SELECT status, COUNT(*) FROM orders GROUP BY status;

-- Target
SELECT status, COUNT(*) FROM orders GROUP BY status;

Catches:

  • Enum mapping bugs (legacy 'X' meant 'Refunded' but target maps it to 'Cancelled')
  • Status migration errors

Level 4: Foreign key integrity

-- Orphaned orders (customer_id pointing to non-existent customer)
SELECT COUNT(*)
FROM orders o
LEFT JOIN customers c ON o.customer_id = c.id
WHERE c.id IS NULL;

Run on both. Should be 0 in target. If non-zero in source, document (those rows were already orphaned in legacy — that's the existing data quality issue, not a migration bug).

Level 5: Spot checks (sample rows)

Take N random rows from source, find the matching rows in target, compare every field:

-- Source: 50 random orders
SELECT TOP 50 * FROM orders ORDER BY NEWID();

Then for each, query target by primary key and diff field-by-field.

This catches:

  • Field-level translation bugs (one column wasn't migrated; default value used)
  • Encoding issues (Unicode characters mangled)
  • Whitespace differences (trailing spaces stripped)

Level 6: Time-windowed reconciliation

For ongoing dual-write or CDC, check that recent changes are flowing:

-- Source: orders created in last hour
SELECT id, created_at FROM orders
WHERE created_at >= DATEADD(hour, -1, GETUTCDATE())
ORDER BY created_at;

-- Target: same query
SELECT id, created_at FROM orders
WHERE created_at >= NOW() - INTERVAL '1 hour'
ORDER BY created_at;

The two result sets should match. If target is missing recent rows, the sync pipeline is lagging or broken.

Level 7: Business invariants

Domain-specific checks. Examples:

  • Sum of order line items = order total
  • Sum of all account balances = sum of all transactions
  • Active customer count + inactive count = total count
  • Sum of inventory movements = current stock

These catch business logic errors during migration that pure data checks miss.

Process when activated

Step 1: Understand the migration

Ask:

  1. Source connection: how to access source DB
  2. Target connection: how to access target DB
  3. Tables in scope: which tables to validate (often a subset)
  4. Phase: pre-cutover (initial backfill done, planning to verify) or post-cutover (already migrated, verifying ongoing parity)
  5. Tolerance: zero tolerance, or some drift acceptable (e.g., timestamps on metadata fields are fine to differ)

Step 2: Generate validation script

Generate a script that runs all 7 levels of validation. Output the queries in both source and target SQL dialects.

For each table, the script should:

  1. Run Level 1 (counts) — fail loudly if mismatch
  2. Run Level 2 (aggregates) — report differences
  3. Run Level 3 (distributions) — report group-by differences
  4. Run Level 4 (FK integrity) — report orphans
  5. Run Level 5 (spot checks) — sample 50 rows, diff field-by-field
  6. Run Level 6 (recent windows) — if dual-write phase
  7. Run Level 7 (business invariants) — domain-specific

The script:

  • Connects to both DBs
  • Runs queries in parallel where possible
  • Outputs a structured report (Markdown or JSON)
  • Marks each check as PASS / FAIL / WARN with details

Step 3: Generate the comparison script

For SQL Server → Postgres comparisons, the script needs to handle dialect differences. Generate the queries paired:

# scripts/validation/compare.py
import pyodbc          # for SQL Server
import psycopg2        # for Postgres
import sys
from decimal import Decimal

def validate_table_counts(source_conn, target_conn, table_name):
    src_cur = source_conn.cursor()
    tgt_cur = target_conn.cursor()

    src_cur.execute(f"SELECT COUNT(*) FROM {table_name}")
    src_count = src_cur.fetchone()[0]

    tgt_cur.execute(f"SELECT COUNT(*) FROM {table_name}")
    tgt_count = tgt_cur.fetchone()[0]

    return {
        'check': 'row_count',
        'table': table_name,
        'source': src_count,
        'target': tgt_count,
        'status': 'PASS' if src_count == tgt_count else 'FAIL',
        'delta': tgt_count - src_count
    }

# ... similar for aggregates, distributions, etc.

def main():
    source = pyodbc.connect(SOURCE_CONN_STRING)
    target = psycopg2.connect(TARGET_CONN_STRING)

    results = []
    for table in TABLES_TO_VALIDATE:
        results.append(validate_table_counts(source, target, table))
        results.append(validate_aggregates(source, target, table))
        # ... etc

    report = generate_markdown_report(results)
    print(report)

    # Exit non-zero if any FAIL
    if any(r['status'] == 'FAIL' for r in results):
        sys.exit(1)

Step 4: Generate sample-row diff logic

For Level 5 spot checks, generate code that:

  • Takes 50 random IDs from source
  • Queries the same 50 from target
  • Diffs every field
  • Reports which fields differ and how
def spot_check_table(source_conn, target_conn, table_name, pk_column='id', sample_size=50):
    src_cur = source_conn.cursor()
    src_cur.execute(f"SELECT TOP {sample_size} * FROM {table_name} ORDER BY NEWID()")
    src_rows = src_cur.fetchall()
    src_columns = [desc[0] for desc in src_cur.description]

    differences = []
    for src_row in src_rows:
        pk_value = src_row[src_columns.index(pk_column)]
        tgt_cur = target_conn.cursor()
        tgt_cur.execute(f"SELECT * FROM {table_name} WHERE {pk_column} = %s", (pk_value,))
        tgt_row = tgt_cur.fetchone()

        if tgt_row is None:
            differences.append({'pk': pk_value, 'issue': 'Missing in target'})
            continue

        for col_idx, col_name in enumerate(src_columns):
            src_val = src_row[col_idx]
            tgt_val = tgt_row[col_idx]
            if not values_equal(src_val, tgt_val, col_name):
                differences.append({
                    'pk': pk_value,
                    'column': col_name,
                    'source': src_val,
                    'target': tgt_val
                })

    return differences

def values_equal(src, tgt, col_name):
    """Compare values with tolerance for known acceptable differences."""
    # Whitespace tolerance for legacy-stored text
    if isinstance(src, str) and isinstance(tgt, str):
        return src.strip() == tgt.strip()
    # Decimal precision tolerance
    if isinstance(src, Decimal) and isinstance(tgt, Decimal):
        return abs(src - tgt) < Decimal('0.001')
    # Skip migration-added timestamp fields
    if col_name in {'migrated_at', 'last_synced'}:
        return True
    return src == tgt

Step 5: Generate the drift report

For ongoing migrations (dual-write, CDC), generate a recurring drift report that:

  • Runs every hour / shift / day
  • Reports new drift since last run
  • Alerts if drift exceeds threshold
def drift_report():
    return {
        'timestamp': datetime.now(),
        'tables': {
            'customers': {
                'count_drift': 0,
                'sum_drift': Decimal('0.00'),
                'last_synced': '2024-01-15T12:00:00Z',
                'lag_seconds': 30
            },
            'orders': {
                'count_drift': 5,  # FAIL
                'sum_drift': Decimal('-127.50'),
                'last_synced': '2024-01-15T11:55:00Z',
                'lag_seconds': 330
            }
        }
    }

Email or Slack the report on schedule. Alert thresholds:

  • Count drift > 10 rows
  • Sum drift > $1.00
  • Sync lag > 5 minutes

Step 6: Output the actual diff

When validation fails, the user wants to see the actual difference:

## Validation report — 2024-01-15 14:00 UTC

### customers
- ✅ Row count: 1,234,567 (match)
- ✅ Sum check: passed
- ✅ Distribution: matches across 5 status values
- ✅ FK integrity: 0 orphans
- ✅ Spot check: 50/50 rows match
- **Status: PASS**

### orders
- ❌ Row count: source=8,901,234, target=8,901,229, delta=-5
- ❌ Sum check: source=$45,678,123.50, target=$45,678,000.00, delta=-$123.50
- ⚠️ Distribution: matches except 'cancelled' (src=12,345, tgt=12,340)
- ✅ FK integrity: 0 orphans
- ❌ Spot check: 47/50 rows match. 3 differences:
  - id=42: column `discount` source=$10.00, target=$0.00
  - id=87: column `notes` source='Test order', target=NULL
  - id=156: column `status` source='cancelled', target='void'
- **Status: FAIL — 5 rows missing, $123.50 missing, 3 spot-check failures**

### Recommended actions
1. Investigate the 5 missing orders (likely the same orders that account for the $123.50)
2. Check status mapping: 'void' vs 'cancelled' — are these supposed to be the same enum value?
3. Re-run migration for affected rows OR backfill manually

Step 7: Don't let "small" drift slide

If drift is small (e.g., 5 rows out of 9 million):

  • It's still worth investigating
  • 5 missing rows could be 5 customers' orders that vanished
  • Don't accept "we'll fix it later" — those drift items become permanent

Push back on:

  • "It's only 0.0001% drift, ship it"
  • "We'll deal with the missing rows in a one-off script"
  • "The sums are close enough"

Ask: which 5 rows? Are they meaningful? Why are they missing?

Anti-patterns to avoid

  • Validating only counts (misses field-level corruption)
  • Validating only aggregates (misses distribution shifts)
  • Sample size too small (50 rows out of 10M might miss systematic bugs)
  • Random sampling without seed (can't reproduce findings)
  • Treating warnings as passes
  • Letting drift accumulate (small drift today = big problem tomorrow)
  • Validating on dev data (validate on production-like volumes)

Output format

Default structure:

## Validation results

[Per-table report with PASS/FAIL/WARN]

## Summary
- Tables checked: N
- Passed: N
- Failed: N
- Warnings: N

## Action items
[Specific, actionable next steps]

## Scripts generated
[Paths to validation scripts]

Make findings copy-pasteable for tickets.


## Pairing with other skills

- **Data Migration Plan** template includes a section on validation; this skill generates the actual validation code
- **Migration Planner Skill** orchestrates this skill during the validation phase
- **Postgres MCP** + **Filesystem MCP** make this skill more useful — direct DB access for queries

## Tips

- Run validation continuously during dual-write phases. Don't wait until cutover to discover drift.
- Save baseline validation reports — comparing today's drift to last week's tells you if the system is degrading.
- For very large tables, sampling at the right scale matters. 50 rows out of 10M might miss systemic bugs; sample smarter (stratified by status, by date range).
- Pair with feature flags: validate per-tenant after each tenant migration.

## Limitations

- Cannot validate behavior, only data state. (Behavior parity tests handle behavior.)
- Cross-engine comparisons (SQL Server vs Postgres) require careful handling of type differences (decimal precision, datetime tz).
- For very large tables, full validation is expensive — generate sampled validation scripts for these.
- Cannot fix the drift, only report it. Remediation is a separate concern.

Related assets

Command Palette

Search for a command to run...