Slowly Changing Dimensions (SCD) in PySpark
Dimensional modeling fundamentals for a modern data platform. Most data platforms have customers, products, employees, accounts — and these change over time. How you handle that change determines whether historical reports still make sense.
This template generates SCD implementations in PySpark using Delta Lake MERGE for the four most common types.
When to use
- Building Gold/Silver dimension tables
- Migrating from traditional ETL where SCD was implicit
- Adding history tracking to a dimension that didn't have it before
- Standardizing SCD patterns across an organization
SCD types overview
The classic Kimball SCD types, with modern interpretations:
Type 1: Overwrite (no history)
The new value replaces the old. No history kept.
Use when: History doesn't matter (typo corrections, normalization changes). Example: Customer's email address updated; we don't care about the old one.
Type 2: Add new row (full history)
A new row is added; old row is marked as no longer current.
Use when: Historical accuracy matters for reports (segment changes, address moves). Example: Customer's segment changes from "Bronze" to "Gold"; reports of past quarters should still show the old segment.
Type 3: Add new column (limited history)
A new column tracks "previous value." Only one history step.
Use when: You need to know "previous" but not full history. Example: Product had a previous category before re-categorization.
Type 6 (also called "1+2+3"): Hybrid
Combines Type 1 (current value column), Type 2 (history rows), Type 3 (some historical columns).
Use when: You want both "what's the current value" (fast) AND "what was the value at time T" (accurate). Most flexible but most complex.
When in doubt
Default to Type 2. It's what most analytics need. Add Type 6 if you also need fast "current value" lookups.
Prompt
You are a senior data engineer implementing slowly changing dimensions in
PySpark with Delta Lake. Generate the implementation for the dimension
described.
## Input
**Dimension:** {{dimension_name}}
**SCD type:** {{scd_type}}
**Source columns:** {{source_columns}}
**Business key:** {{business_key}}
**Tracked attributes:** {{tracked_attributes}}
## Output
Generate complete production-ready PySpark code for the chosen SCD type.
## Type 1 implementation (overwrite)
```python
"""
NB_Gold_dim_customer_type1
Purpose: Build dim_customer with Type 1 SCD (no history; latest values only)
"""
from pyspark.sql import functions as F
from delta.tables import DeltaTable
# Read source
source_df = spark.read.format("delta").table("silver.customer")
# Standardize and enrich
dim_customer = (source_df
.select(
F.col("customer_id").alias("customer_id"), # business key
F.col("customer_name"),
F.col("email"),
F.col("segment"),
F.col("country"),
F.current_timestamp().alias("last_updated_at")
)
)
# MERGE pattern — overwrite changed rows, insert new
target_path = "abfss://lakehouse@evokedatalake.dfs.core.windows.net/gold/dim_customer"
# Ensure target exists
spark.sql(f"""
CREATE TABLE IF NOT EXISTS gold.dim_customer (
customer_id STRING,
customer_name STRING,
email STRING,
segment STRING,
country STRING,
last_updated_at TIMESTAMP
)
USING DELTA
LOCATION '{target_path}'
""")
target = DeltaTable.forPath(spark, target_path)
(target.alias("t")
.merge(
dim_customer.alias("s"),
"t.customer_id = s.customer_id"
)
.whenMatchedUpdate(set={
"customer_name": "s.customer_name",
"email": "s.email",
"segment": "s.segment",
"country": "s.country",
"last_updated_at": "s.last_updated_at"
})
.whenNotMatchedInsertAll()
.execute()
)
logger.info(f"Type 1 merge complete: {dim_customer.count()} source rows")
```
## Type 2 implementation (full history)
The key pattern: every row has `valid_from`, `valid_to`, `is_current`. When a tracked attribute changes, the existing row gets `valid_to` and `is_current = false`, and a new row is inserted.
```python
"""
NB_Gold_dim_customer_type2
Purpose: Build dim_customer with Type 2 SCD (full history tracking)
"""
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from delta.tables import DeltaTable
# Define which columns are tracked (changes trigger new row)
tracked_columns = ["customer_name", "email", "segment", "country"]
# Read source — should be the latest snapshot
source_df = spark.read.format("delta").table("silver.customer")
# Compute hash of tracked columns for change detection
tracked_hash = F.sha2(
F.concat_ws("||", *[F.col(c).cast("string") for c in tracked_columns]),
256
)
# Prepare source with metadata columns
source_with_meta = (source_df
.select(
F.col("customer_id"),
*[F.col(c) for c in tracked_columns],
tracked_hash.alias("_tracked_hash"),
F.current_timestamp().alias("_change_detected_at")
)
)
# Target table schema
target_path = "abfss://lakehouse@evokedatalake.dfs.core.windows.net/gold/dim_customer"
spark.sql(f"""
CREATE TABLE IF NOT EXISTS gold.dim_customer (
customer_sk BIGINT GENERATED ALWAYS AS IDENTITY,
customer_id STRING NOT NULL,
customer_name STRING,
email STRING,
segment STRING,
country STRING,
valid_from TIMESTAMP NOT NULL,
valid_to TIMESTAMP,
is_current BOOLEAN NOT NULL,
_tracked_hash STRING NOT NULL
)
USING DELTA
LOCATION '{target_path}'
""")
# Read current target state
target = DeltaTable.forPath(spark, target_path)
current_target = (spark.read.format("delta").load(target_path)
.filter(F.col("is_current") == True)
)
# Identify changes by joining source to current target
joined = (source_with_meta.alias("s")
.join(current_target.alias("t"),
F.col("s.customer_id") == F.col("t.customer_id"),
"left")
)
# Categorize: new (no match in target), changed (match but hash differs), unchanged (match and hash same)
new_records = joined.filter(F.col("t.customer_id").isNull())
changed_records = joined.filter(
F.col("t.customer_id").isNotNull() &
(F.col("s._tracked_hash") != F.col("t._tracked_hash"))
)
logger.info(f"New: {new_records.count()}, Changed: {changed_records.count()}")
# Step 1: For CHANGED records, expire the existing current row
if changed_records.count() > 0:
expire_keys = changed_records.select(F.col("s.customer_id")).distinct()
(target.alias("t")
.merge(
expire_keys.alias("e"),
"t.customer_id = e.customer_id AND t.is_current = true"
)
.whenMatchedUpdate(set={
"valid_to": F.current_timestamp(),
"is_current": F.lit(False)
})
.execute()
)
# Step 2: INSERT new versions for both new and changed records
to_insert = (new_records.union(changed_records)
.select(
F.col("s.customer_id").alias("customer_id"),
F.col("s.customer_name").alias("customer_name"),
F.col("s.email").alias("email"),
F.col("s.segment").alias("segment"),
F.col("s.country").alias("country"),
F.current_timestamp().alias("valid_from"),
F.lit(None).cast("timestamp").alias("valid_to"),
F.lit(True).alias("is_current"),
F.col("s._tracked_hash").alias("_tracked_hash")
)
)
if to_insert.count() > 0:
to_insert.write.format("delta").mode("append").save(target_path)
logger.info(
f"Type 2 SCD complete. "
f"Inserted {to_insert.count()} new versions, "
f"expired {changed_records.count()} previous versions."
)
```
### Querying Type 2 dimensions
For "current state":
```sql
SELECT * FROM gold.dim_customer WHERE is_current = true;
```
For "state as of a specific date" (the whole point of Type 2):
```sql
SELECT * FROM gold.dim_customer
WHERE valid_from <= '2024-06-15'
AND (valid_to > '2024-06-15' OR valid_to IS NULL);
```
Joining facts to Type 2 dim by transaction date:
```sql
SELECT
f.order_id,
f.order_date,
d.customer_name,
d.segment -- this is the segment AT THE TIME OF THE ORDER
FROM gold.fact_orders f
JOIN gold.dim_customer d
ON f.customer_id = d.customer_id
AND f.order_date >= d.valid_from
AND (f.order_date < d.valid_to OR d.valid_to IS NULL);
```
## Type 3 implementation (limited history)
```python
"""
Type 3: Track previous value of one or two attributes.
Useful when you only need "previous" not full history.
"""
# Schema includes current and previous columns
# customer_id, customer_name, current_segment, previous_segment, segment_change_date
spark.sql(f"""
CREATE TABLE IF NOT EXISTS gold.dim_customer_type3 (
customer_id STRING,
customer_name STRING,
current_segment STRING,
previous_segment STRING,
segment_change_date TIMESTAMP,
last_updated_at TIMESTAMP
)
USING DELTA
LOCATION '{target_path}'
""")
target = DeltaTable.forPath(spark, target_path)
(target.alias("t")
.merge(
source_df.alias("s"),
"t.customer_id = s.customer_id"
)
# Segment changed: shift current to previous, set new current
.whenMatchedUpdate(
condition="t.current_segment != s.segment",
set={
"previous_segment": "t.current_segment",
"current_segment": "s.segment",
"segment_change_date": F.current_timestamp(),
"last_updated_at": F.current_timestamp()
}
)
# Same segment: just update other fields
.whenMatchedUpdate(
condition="t.current_segment = s.segment",
set={
"customer_name": "s.customer_name",
"last_updated_at": F.current_timestamp()
}
)
.whenNotMatchedInsert(values={
"customer_id": "s.customer_id",
"customer_name": "s.customer_name",
"current_segment": "s.segment",
"previous_segment": F.lit(None),
"segment_change_date": F.lit(None),
"last_updated_at": F.current_timestamp()
})
.execute()
)
```
## Type 6 implementation (hybrid)
Combines Type 1 (current value, fast lookups), Type 2 (full history), Type 3 (some historical columns).
```python
"""
Type 6: Hybrid pattern.
Each row has both 'current_*' and historical 'as_of_*' columns.
Latest row is identifiable by is_current.
"""
# Schema has both current_* and as_of_* attributes
spark.sql(f"""
CREATE TABLE IF NOT EXISTS gold.dim_customer_type6 (
customer_sk BIGINT GENERATED ALWAYS AS IDENTITY,
customer_id STRING NOT NULL,
customer_name STRING,
-- Type 2 historical attributes
as_of_segment STRING, -- segment as of this row's valid window
as_of_country STRING,
-- Type 1 current attributes (overwritten on every change)
current_segment STRING, -- always the latest segment, even on old rows
current_country STRING,
valid_from TIMESTAMP NOT NULL,
valid_to TIMESTAMP,
is_current BOOLEAN NOT NULL,
_tracked_hash STRING NOT NULL
)
USING DELTA
""")
# Type 6 merge has two phases:
# 1. Type 2 logic for new history rows
# 2. Type 1 update propagating current_* across ALL historical rows for that customer
# Phase 1: same as Type 2 (above) — insert new row, expire old current
# ... (Type 2 logic)
# Phase 2: propagate current values across history
spark.sql(f"""
UPDATE gold.dim_customer_type6 dim
SET
current_segment = src.segment,
current_country = src.country
FROM (
SELECT customer_id, segment, country
FROM silver.customer
) src
WHERE dim.customer_id = src.customer_id
""")
```
### Querying Type 6 dimensions
For "current state":
```sql
SELECT customer_id, current_segment FROM gold.dim_customer_type6 WHERE is_current = true;
```
For "as-of-time":
```sql
SELECT customer_id, as_of_segment
FROM gold.dim_customer_type6
WHERE customer_id = 'C123'
AND valid_from <= '2024-06-15'
AND (valid_to > '2024-06-15' OR valid_to IS NULL);
```
For "show me the order with segment-at-the-time AND current-segment":
```sql
SELECT
f.order_id,
d.as_of_segment, -- what segment they were at order time
d.current_segment -- what segment they are now
FROM gold.fact_orders f
JOIN gold.dim_customer_type6 d
ON f.customer_id = d.customer_id
AND f.order_date >= d.valid_from
AND (f.order_date < d.valid_to OR d.valid_to IS NULL);
```
## Hard problems and patterns
### Late-arriving data
What if data arrives 3 days late and the dimension already has newer rows?
```python
# Find the appropriate version of the dimension at the late-arriving fact's date
def get_dim_at_date(dim_path, customer_id, as_of_date):
return (spark.read.format("delta").load(dim_path)
.filter(F.col("customer_id") == customer_id)
.filter(F.col("valid_from") <= as_of_date)
.filter((F.col("valid_to") > as_of_date) | F.col("valid_to").isNull())
.first()
)
```
For batch handling of late-arriving facts: join with date-aware logic.
### Changes to tracked attribute set
What if you decide to start tracking a column you weren't tracking?
- Backfill: assume the current value applied historically (loses information, simplest)
- Reconstruct: pull from source system history if available (complex)
- Document: note the cutoff date when tracking started
### Performance at scale
Type 2 dim tables grow indefinitely. For dims with billions of versions:
- **Partition by `is_current`**: scan only current rows for hot queries
- **Z-order on business key + valid_from**: optimize range queries
- **Periodic OPTIMIZE**: compact small files
- **Archive truly old history**: rows with valid_to > 5 years ago to a colder tier
### Surrogate keys
SCD Type 2/6 typically uses surrogate keys (auto-incremented integers) for fact joins:
- Generate via `GENERATED ALWAYS AS IDENTITY` (Delta supports this)
- Or assign during merge using monotonically increasing counter
- Facts join on surrogate key, not business key
```python
# Add surrogate key during insert
to_insert_with_sk = to_insert.withColumn(
"customer_sk",
F.monotonically_increasing_id() + max_existing_sk + 1
)
```
## Testing SCD implementations
```python
# Test 1: Initial load — all rows should be is_current=true, valid_to=null
def test_initial_load():
df = spark.read.format("delta").load(target_path)
assert df.filter(F.col("is_current") == False).count() == 0
assert df.filter(F.col("valid_to").isNotNull()).count() == 0
# Test 2: Re-running same data — no changes should occur
def test_idempotency():
initial_count = spark.read.format("delta").load(target_path).count()
run_scd_logic(source_df) # Re-run with same data
final_count = spark.read.format("delta").load(target_path).count()
assert initial_count == final_count, "Idempotency violated"
# Test 3: Changing a tracked column creates new version
def test_change_creates_version():
initial_count_for_customer = (spark.read.format("delta").load(target_path)
.filter(F.col("customer_id") == "C001")
.count()
)
# Modify customer C001's segment
modified_source = source_df.withColumn(
"segment",
F.when(F.col("customer_id") == "C001", "Platinum").otherwise(F.col("segment"))
)
run_scd_logic(modified_source)
final_count = (spark.read.format("delta").load(target_path)
.filter(F.col("customer_id") == "C001")
.count()
)
assert final_count == initial_count_for_customer + 1
# Test 4: Exactly one current row per business key
def test_one_current_per_key():
df = spark.read.format("delta").load(target_path)
multi_current = (df
.filter(F.col("is_current") == True)
.groupBy("customer_id")
.count()
.filter(F.col("count") > 1)
.count()
)
assert multi_current == 0, f"Found {multi_current} customers with >1 current row"
```
## Anti-patterns to avoid
- **Type 2 without proper change detection.** If you re-insert every row every run, you've got pseudo-history with garbage rows.
- **No idempotency.** Pipeline re-runs should produce same final state.
- **Updating valid_to to a future date.** Use NULL for "currently valid"; conditional joins handle it.
- **Type 1 when Type 2 is needed.** Lost history is lost forever; can't go back and reconstruct.
- **Type 2 when Type 1 is enough.** Massive table growth for no analytical benefit.
- **No surrogate keys.** Joining facts on business keys = brittle when business keys change.
- **No tests.** SCD bugs are subtle; tests catch them before they ship.
## Output
For each dimension:
1. **`notebooks/gold/NB_dim_<entity>.py`** — main SCD logic
2. **`notebooks/gold/test_dim_<entity>.py`** — test suite
3. **`docs/dimensions/<entity>.md`** — documentation (which type, why, tracked columns, query patterns)
Tips
- Default to Type 2 for analytics. When unsure, Type 2 gives more flexibility downstream.
- Document the decision. Future maintainers need to know WHY you chose this type.
- Test the late-arriving data scenario. It's where SCD bugs hide.
- Don't track every column. Pick the columns that matter analytically; ignore noise.
- Plan for change. Tracked attribute sets evolve; document how to handle additions.
Common mistakes to avoid
- Treating SCD as configuration, not as design choice
- Type 2 without proper hash-based change detection (creates spurious versions)
- Surrogate keys generated non-monotonically (violates assumptions of downstream code)
- No is_current flag (forces every query to do max(valid_from) — slow)
- valid_to with future dates instead of NULL (breaks "currently valid" pattern)
- Joining facts to dim on business key instead of surrogate key (brittle to ID changes)
- Forgetting to handle source NULL → already-NULL changes (avoid spurious versions)