Playbook

PySpark Transformation Standards

Standards for PySpark transformations in Bronze/Silver/Gold pipelines: idempotency, partitioning, Delta Lake patterns, and code organization.

PySpark Transformation Standards

Standards for PySpark transformations in modern data platforms. Covers Bronze (raw landing), Silver (cleansed), and Gold (business-ready) layers with idempotency, partitioning, Delta Lake patterns, and code organization.

When to use

  • Setting up a new PySpark codebase (Fabric notebooks, Databricks, Synapse Spark)
  • Standardizing across teams that have diverged
  • During code review of PySpark transformations
  • Training new data engineers on patterns

The medallion architecture (Bronze / Silver / Gold)

Each layer has different rules and patterns:

Bronze layer

  • Purpose: Land raw data unchanged from source
  • Schema: As-source (preserve exact field names, types, encoding)
  • Quality: No cleansing; capture as-is
  • History: Append-only; partition by ingestion date
  • Format: Delta Lake (preferred) or Parquet

Silver layer

  • Purpose: Cleansed, conformed, validated data
  • Schema: Standardized (common naming, common types)
  • Quality: Deduplicated, validated, type-corrected
  • History: Type 2 SCD where business needs require
  • Format: Delta Lake with MERGE patterns

Gold layer

  • Purpose: Business-ready data marts and aggregations
  • Schema: Dimensional (star/snowflake) or analytical
  • Quality: Tested for business rules, ready for consumption
  • History: As required by use case
  • Format: Delta Lake or Synapse / Fabric Warehouse tables

Prompt

You are a senior data engineer writing PySpark transformations for a
modern data platform. Generate idiomatic, production-quality code following
the standards below.

## Input

**Project context:** {{project_context}}
**Target layer:** {{target_layer}}
**Source format:** {{source_format}}
**Target format:** {{target_format}}

## Output

Generate complete, production-ready PySpark code following these standards.

## Bronze layer template

```python
"""
Notebook: NB_Bronze_<source>_<table>
Purpose: Land raw data from <source>.<table> into Bronze layer
Owner: data-engineering@evoke.com
Schedule: Daily 02:00 UTC (via ADF master pipeline)

Inputs:
    - Source: <source system, table>
    - Watermark: passed as parameter from ADF

Outputs:
    - <ADLS path>/bronze/<source>/<table>/
    - Partitioned by ingestion_date

SLA: 30 minutes
Notification: data-engineering-alerts@evoke.com on failure
"""

# COMMAND ----------
# MAGIC %md
# MAGIC ## Imports and configuration

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime
import logging

logger = logging.getLogger(__name__)

# COMMAND ----------
# MAGIC %md
# MAGIC ## Parameters

dbutils.widgets.text("source_system", "")
dbutils.widgets.text("source_table", "")
dbutils.widgets.text("watermark_value", "")
dbutils.widgets.text("run_id", "")
dbutils.widgets.text("environment", "dev")

source_system = dbutils.widgets.get("source_system")
source_table = dbutils.widgets.get("source_table")
watermark_value = dbutils.widgets.get("watermark_value")
run_id = dbutils.widgets.get("run_id")
environment = dbutils.widgets.get("environment")

# Validate parameters
if not source_system or not source_table:
    raise ValueError("source_system and source_table are required parameters")

ingestion_date = datetime.utcnow().strftime("%Y-%m-%d")
ingestion_timestamp = datetime.utcnow().isoformat()

logger.info(f"Starting bronze ingestion: {source_system}.{source_table} run_id={run_id}")

# COMMAND ----------
# MAGIC %md
# MAGIC ## Read source

# Read pattern depends on source — use the appropriate one
# For SQL source via JDBC:
source_df = (spark.read
    .format("jdbc")
    .option("url", get_secret("jdbc_url_" + source_system))
    .option("dbtable", f"({source_query}) AS src")
    .option("user", get_secret("user_" + source_system))
    .option("password", get_secret("password_" + source_system))
    .option("fetchsize", "10000")
    .option("numPartitions", "4")
    .load()
)

# For file-based source (CSV, JSON, Parquet):
# source_df = (spark.read
#     .format("csv")
#     .option("header", "true")
#     .option("inferSchema", "false")  # Always specify schema explicitly in production
#     .schema(source_schema)  # Define schema; never inferSchema in production
#     .load(source_path)
# )

# COMMAND ----------
# MAGIC %md
# MAGIC ## Add Bronze metadata columns

# Standard Bronze metadata; every Bronze table has these
bronze_df = (source_df
    .withColumn("_ingestion_date", F.lit(ingestion_date).cast("date"))
    .withColumn("_ingestion_timestamp", F.lit(ingestion_timestamp).cast("timestamp"))
    .withColumn("_source_system", F.lit(source_system))
    .withColumn("_source_table", F.lit(source_table))
    .withColumn("_run_id", F.lit(run_id))
    .withColumn("_record_hash", F.sha2(F.concat_ws("||",
        *[F.col(c).cast("string") for c in source_df.columns]), 256))
)

# COMMAND ----------
# MAGIC %md
# MAGIC ## Write to Bronze

bronze_path = f"abfss://lakehouse@evokedatalake{environment}.dfs.core.windows.net/bronze/{source_system}/{source_table}"

(bronze_df.write
    .format("delta")
    .mode("append")  # Bronze is append-only
    .partitionBy("_ingestion_date")
    .option("mergeSchema", "true")  # Allow schema evolution
    .save(bronze_path)
)

# COMMAND ----------
# MAGIC %md
# MAGIC ## Log results

row_count = bronze_df.count()
logger.info(f"Loaded {row_count} rows into {bronze_path}")

# Return result for ADF to capture
result = {
    "row_count": row_count,
    "ingestion_date": ingestion_date,
    "target_path": bronze_path,
    "status": "success"
}
dbutils.notebook.exit(json.dumps(result))
```

## Silver layer template

```python
"""
Notebook: NB_Silver_<entity>
Purpose: Cleanse and conform <entity> from Bronze to Silver
Owner: data-engineering@evoke.com

Inputs:
    - bronze.<entity> (one or more sources)

Outputs:
    - silver.<entity>
    - Conformed schema, deduplicated, validated

Patterns applied:
    - Type 2 SCD if business requires history
    - MERGE for upserts
    - Data quality validation before write

SLA: 60 minutes
"""

# COMMAND ----------
# Read from Bronze (latest ingestion date by default)

bronze_customers = (spark.read
    .format("delta")
    .load("abfss://lakehouse@evokedatalake.dfs.core.windows.net/bronze/salesforce/account")
    .filter(F.col("_ingestion_date") == F.current_date())
)

# COMMAND ----------
# Standardize schema

silver_customers = (bronze_customers
    # Rename columns to standard names (camelCase or snake_case — pick ONE org-wide)
    .withColumnRenamed("Id", "customer_id")
    .withColumnRenamed("Name", "customer_name")
    .withColumnRenamed("Email__c", "email")
    .withColumnRenamed("CreatedDate", "created_at")

    # Cast to standard types
    .withColumn("customer_id", F.col("customer_id").cast("string"))
    .withColumn("created_at", F.col("created_at").cast("timestamp"))

    # Cleanse (NEVER modify original Bronze; always produce a new column or
    # replace one in flight to Silver)
    .withColumn("email", F.lower(F.trim(F.col("email"))))
    .withColumn("customer_name", F.trim(F.col("customer_name")))

    # Add Silver metadata
    .withColumn("_silver_load_timestamp", F.current_timestamp())
    .withColumn("_source_system", F.col("_source_system"))  # carry forward
)

# COMMAND ----------
# Validate before writing

# Data quality checks — fail loudly if violated
null_count = silver_customers.filter(F.col("customer_id").isNull()).count()
if null_count > 0:
    raise ValueError(f"Found {null_count} customer records with NULL customer_id")

duplicate_count = (silver_customers
    .groupBy("customer_id")
    .count()
    .filter(F.col("count") > 1)
    .count()
)
if duplicate_count > 0:
    raise ValueError(f"Found {duplicate_count} duplicate customer_ids")

# Email format check (warn, don't fail)
invalid_email_count = (silver_customers
    .filter(F.col("email").isNotNull())
    .filter(~F.col("email").rlike(r"^[^@\s]+@[^@\s]+\.[^@\s]+$"))
    .count()
)
if invalid_email_count > 0:
    logger.warning(f"Found {invalid_email_count} records with invalid email format")

# COMMAND ----------
# Write Silver via MERGE (upsert)

silver_path = "abfss://lakehouse@evokedatalake.dfs.core.windows.net/silver/customer"

# Create target if not exists
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver.customer
    USING DELTA
    LOCATION '{silver_path}'
    AS SELECT * FROM silver_customers WHERE 1=0
""")

# MERGE pattern (upsert)
from delta.tables import DeltaTable

target = DeltaTable.forPath(spark, silver_path)

(target.alias("target")
    .merge(
        silver_customers.alias("source"),
        "target.customer_id = source.customer_id"
    )
    .whenMatchedUpdate(
        condition="target._record_hash != source._record_hash",  # only update if changed
        set={col: f"source.{col}" for col in silver_customers.columns}
    )
    .whenNotMatchedInsert(
        values={col: f"source.{col}" for col in silver_customers.columns}
    )
    .execute()
)

logger.info(f"Silver merge complete: {silver_customers.count()} source rows processed")
```

## Gold layer template

```python
"""
Notebook: NB_Gold_dim_customer
Purpose: Build dimensional customer table for Power BI consumption
Owner: data-engineering@evoke.com

Patterns:
    - Type 2 SCD on slowly-changing attributes (address, segment)
    - Surrogate keys for dimensional joins
    - Business rules applied
"""

# Build dim_customer with Type 2 SCD on segment changes
# Refer to SCD template for full pattern

# Aggregations into fact tables
fact_orders_daily = (silver_orders
    .groupBy(
        F.col("order_date"),
        F.col("customer_id"),
        F.col("product_category")
    )
    .agg(
        F.sum("order_amount").alias("total_amount"),
        F.count("order_id").alias("order_count"),
        F.avg("order_amount").alias("avg_order_amount")
    )
)

# Write to Synapse / Fabric Warehouse via Polybase / COPY INTO
(fact_orders_daily.write
    .format("com.microsoft.sqlserver.jdbc.spark")
    .mode("overwrite")
    .option("url", synapse_url)
    .option("dbtable", "dbo.fact_orders_daily")
    .option("user", get_secret("synapse_user"))
    .option("password", get_secret("synapse_password"))
    .option("tempDir", "abfss://staging@evokedatalake.dfs.core.windows.net/tempload")
    .save()
)
```

## Standards and patterns

### Idempotency (non-negotiable)

Every notebook must be safe to re-run:

- **Bronze:** append with date partitioning; re-run replaces same partition
- **Silver:** MERGE / upsert on primary key; produces same final state
- **Gold:** transformations are deterministic functions of Silver state

Never use plain INSERT in Silver/Gold (creates duplicates on re-run).

### Partitioning strategy

- **Bronze:** partition by ingestion_date — appropriate for time-series append
- **Silver:** partition by business date column (order_date, event_date) if highly date-skewed; otherwise no partitioning under 100M rows
- **Gold:** partition aligned to query patterns (most queries filter by date → partition by date)

Don't over-partition. &lt;1GB per partition is bad. Aim for 500MB-2GB per partition.

### Schema evolution

- **Bronze:** allow schema evolution (`mergeSchema=true`); source schemas drift
- **Silver:** explicit schema; rejects unexpected columns. Schema changes are deliberate.
- **Gold:** strict schema; downstream consumers depend on it

### Type handling

- **Strings:** trim by default; use `lower()` for case-insensitive matching keys
- **Decimals:** specify precision explicitly (NEVER use float/double for money)
- **Dates:** cast to date or timestamp explicitly; don't rely on inferSchema
- **Booleans:** standardize on `true`/`false` (not "Y"/"N", "1"/"0", "yes"/"no")
- **Nulls:** handle explicitly (`coalesce`, `fillna`); don't let nulls cascade unexpectedly

### Performance patterns

#### Reading
- **Predicate pushdown:** filter early; Delta and Parquet support it
- **Column pruning:** select only columns you need
- **Partition pruning:** filter on partition columns first
- **Avoid `inferSchema=true` in production** (extra read pass)

#### Joining
- **Broadcast small tables:** `F.broadcast(small_df)` for tables &lt;100MB
- **Repartition for large joins:** match partition count and partition column when possible
- **Avoid skew:** if one key has 90% of rows, salt the join key
- **Sort-merge join:** default for large+large; ensure data is sorted

#### Aggregation
- **Aggregate before joining** when possible (smaller sides)
- **Use approx functions** when exact precision isn't required (`approx_count_distinct`)
- **Window functions:** explicit partitioning to avoid full shuffle

#### Writing
- **Repartition before write:** `df.repartition(N)` to control output file count
- **Optimize write:** `optimizeWrite=true` (Databricks/Fabric) auto-tunes
- **Z-order** on commonly-queried columns (Delta): `OPTIMIZE table ZORDER BY (col)`

### Logging

Every notebook logs structured events:

```python
log_event = {
    "notebook": notebook_name,
    "run_id": run_id,
    "phase": "bronze_ingest",
    "source_system": source_system,
    "source_table": source_table,
    "row_count": row_count,
    "duration_seconds": duration,
    "status": "success",
    "error_message": None
}
spark.createDataFrame([log_event]).write.format("delta").mode("append").saveAsTable("meta.notebook_run_log")
```

### Error handling

```python
try:
    # main logic
    ...
    log_event["status"] = "success"
except Exception as e:
    log_event["status"] = "failed"
    log_event["error_message"] = str(e)
    raise  # re-raise so ADF marks the activity failed
finally:
    log_to_metadata(log_event)  # always log, success or failure
```

### Secrets handling

NEVER hardcode credentials. Use:
- **Databricks:** `dbutils.secrets.get(scope, key)`
- **Fabric / Synapse Notebooks:** Key Vault-linked services + token-based access
- **Service principals** with managed identity preferred

```python
def get_secret(key: str) -> str:
    return dbutils.secrets.get(scope="evoke-data-secrets", key=key)
```

### Code organization within a notebook

Order matters:
1. Docstring header (purpose, owner, SLA)
2. Imports
3. Parameter declarations (widgets)
4. Configuration constants
5. Helper functions (defined locally)
6. Read sources
7. Transform
8. Validate (data quality checks)
9. Write target
10. Log + exit with result

### Reusable utilities

Don't duplicate boilerplate. Build a shared library:

```python
# evoke_data_utils package, imported via wheel or %run
from evoke_data_utils import (
    standard_bronze_metadata,  # adds standard _ingestion_*, _source_*, _run_id columns
    standard_silver_metadata,
    log_run_event,
    get_secret,
    validate_no_nulls,
    validate_unique,
    standard_merge,
)
```

Versioned, tested, used across all notebooks. This is the difference between
spaghetti notebooks and a maintainable codebase.

## Anti-patterns to avoid

- **`.collect()` on large DataFrames.** Pulls all data to driver; OOM on >small data.
- **`.toPandas()` on large DataFrames.** Same problem.
- **`for row in df.collect()`.** Single-threaded loop on driver.
- **`.cache()` without `.unpersist()`.** Memory leak across notebook lifecycle.
- **`inferSchema=true` in production.** Slow, unpredictable.
- **Wide transformations followed by narrow.** Wastes shuffles.
- **No validation before writing.** Garbage in, garbage out.
- **Hardcoded paths.** Use parameters and config.
- **Ignoring schema drift.** Bronze should evolve; Silver should not silently.
- **Spark on data &lt;1GB.** Use pandas / DuckDB / SQL instead. Spark startup overhead isn't worth it.

## Output format

For each notebook, generate:

1. Complete `.py` or `.ipynb` content with all sections above
2. Sample test data setup
3. Sample assertions for unit testing the transformation
4. Documentation about parameters and outputs

Tips

  • Start with the standards. Don't refactor later. Notebooks written without standards are painful to standardize.
  • Build the shared utilities library first. It's the multiplier for everything else.
  • Test transformations in isolation. Pure functions on DataFrames are testable; notebooks coupled to ADF aren't.
  • Profile before optimizing. PySpark performance is rarely intuitive; measure.
  • Watch the small-data trap. Spark for tables <1GB is overkill. Use SQL/pandas/DuckDB for those.

Common mistakes to avoid

  • Treating notebooks as scripts (no structure, no documentation, no error handling)
  • Writing without idempotency
  • Hardcoded paths and credentials
  • No validation before writes
  • Using float/double for money
  • Schema inference in production
  • .collect() / .toPandas() on large data
  • No logging or observability
  • Copy-paste boilerplate instead of shared utilities
  • Spark for data that doesn't need it

Related assets

Command Palette

Search for a command to run...