PySpark Transformation Standards
Standards for PySpark transformations in modern data platforms. Covers Bronze (raw landing), Silver (cleansed), and Gold (business-ready) layers with idempotency, partitioning, Delta Lake patterns, and code organization.
When to use
- Setting up a new PySpark codebase (Fabric notebooks, Databricks, Synapse Spark)
- Standardizing across teams that have diverged
- During code review of PySpark transformations
- Training new data engineers on patterns
The medallion architecture (Bronze / Silver / Gold)
Each layer has different rules and patterns:
Bronze layer
- Purpose: Land raw data unchanged from source
- Schema: As-source (preserve exact field names, types, encoding)
- Quality: No cleansing; capture as-is
- History: Append-only; partition by ingestion date
- Format: Delta Lake (preferred) or Parquet
Silver layer
- Purpose: Cleansed, conformed, validated data
- Schema: Standardized (common naming, common types)
- Quality: Deduplicated, validated, type-corrected
- History: Type 2 SCD where business needs require
- Format: Delta Lake with MERGE patterns
Gold layer
- Purpose: Business-ready data marts and aggregations
- Schema: Dimensional (star/snowflake) or analytical
- Quality: Tested for business rules, ready for consumption
- History: As required by use case
- Format: Delta Lake or Synapse / Fabric Warehouse tables
Prompt
You are a senior data engineer writing PySpark transformations for a
modern data platform. Generate idiomatic, production-quality code following
the standards below.
## Input
**Project context:** {{project_context}}
**Target layer:** {{target_layer}}
**Source format:** {{source_format}}
**Target format:** {{target_format}}
## Output
Generate complete, production-ready PySpark code following these standards.
## Bronze layer template
```python
"""
Notebook: NB_Bronze_<source>_<table>
Purpose: Land raw data from <source>.<table> into Bronze layer
Owner: data-engineering@evoke.com
Schedule: Daily 02:00 UTC (via ADF master pipeline)
Inputs:
- Source: <source system, table>
- Watermark: passed as parameter from ADF
Outputs:
- <ADLS path>/bronze/<source>/<table>/
- Partitioned by ingestion_date
SLA: 30 minutes
Notification: data-engineering-alerts@evoke.com on failure
"""
# COMMAND ----------
# MAGIC %md
# MAGIC ## Imports and configuration
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import *
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Parameters
dbutils.widgets.text("source_system", "")
dbutils.widgets.text("source_table", "")
dbutils.widgets.text("watermark_value", "")
dbutils.widgets.text("run_id", "")
dbutils.widgets.text("environment", "dev")
source_system = dbutils.widgets.get("source_system")
source_table = dbutils.widgets.get("source_table")
watermark_value = dbutils.widgets.get("watermark_value")
run_id = dbutils.widgets.get("run_id")
environment = dbutils.widgets.get("environment")
# Validate parameters
if not source_system or not source_table:
raise ValueError("source_system and source_table are required parameters")
ingestion_date = datetime.utcnow().strftime("%Y-%m-%d")
ingestion_timestamp = datetime.utcnow().isoformat()
logger.info(f"Starting bronze ingestion: {source_system}.{source_table} run_id={run_id}")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Read source
# Read pattern depends on source — use the appropriate one
# For SQL source via JDBC:
source_df = (spark.read
.format("jdbc")
.option("url", get_secret("jdbc_url_" + source_system))
.option("dbtable", f"({source_query}) AS src")
.option("user", get_secret("user_" + source_system))
.option("password", get_secret("password_" + source_system))
.option("fetchsize", "10000")
.option("numPartitions", "4")
.load()
)
# For file-based source (CSV, JSON, Parquet):
# source_df = (spark.read
# .format("csv")
# .option("header", "true")
# .option("inferSchema", "false") # Always specify schema explicitly in production
# .schema(source_schema) # Define schema; never inferSchema in production
# .load(source_path)
# )
# COMMAND ----------
# MAGIC %md
# MAGIC ## Add Bronze metadata columns
# Standard Bronze metadata; every Bronze table has these
bronze_df = (source_df
.withColumn("_ingestion_date", F.lit(ingestion_date).cast("date"))
.withColumn("_ingestion_timestamp", F.lit(ingestion_timestamp).cast("timestamp"))
.withColumn("_source_system", F.lit(source_system))
.withColumn("_source_table", F.lit(source_table))
.withColumn("_run_id", F.lit(run_id))
.withColumn("_record_hash", F.sha2(F.concat_ws("||",
*[F.col(c).cast("string") for c in source_df.columns]), 256))
)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Write to Bronze
bronze_path = f"abfss://lakehouse@evokedatalake{environment}.dfs.core.windows.net/bronze/{source_system}/{source_table}"
(bronze_df.write
.format("delta")
.mode("append") # Bronze is append-only
.partitionBy("_ingestion_date")
.option("mergeSchema", "true") # Allow schema evolution
.save(bronze_path)
)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Log results
row_count = bronze_df.count()
logger.info(f"Loaded {row_count} rows into {bronze_path}")
# Return result for ADF to capture
result = {
"row_count": row_count,
"ingestion_date": ingestion_date,
"target_path": bronze_path,
"status": "success"
}
dbutils.notebook.exit(json.dumps(result))
```
## Silver layer template
```python
"""
Notebook: NB_Silver_<entity>
Purpose: Cleanse and conform <entity> from Bronze to Silver
Owner: data-engineering@evoke.com
Inputs:
- bronze.<entity> (one or more sources)
Outputs:
- silver.<entity>
- Conformed schema, deduplicated, validated
Patterns applied:
- Type 2 SCD if business requires history
- MERGE for upserts
- Data quality validation before write
SLA: 60 minutes
"""
# COMMAND ----------
# Read from Bronze (latest ingestion date by default)
bronze_customers = (spark.read
.format("delta")
.load("abfss://lakehouse@evokedatalake.dfs.core.windows.net/bronze/salesforce/account")
.filter(F.col("_ingestion_date") == F.current_date())
)
# COMMAND ----------
# Standardize schema
silver_customers = (bronze_customers
# Rename columns to standard names (camelCase or snake_case — pick ONE org-wide)
.withColumnRenamed("Id", "customer_id")
.withColumnRenamed("Name", "customer_name")
.withColumnRenamed("Email__c", "email")
.withColumnRenamed("CreatedDate", "created_at")
# Cast to standard types
.withColumn("customer_id", F.col("customer_id").cast("string"))
.withColumn("created_at", F.col("created_at").cast("timestamp"))
# Cleanse (NEVER modify original Bronze; always produce a new column or
# replace one in flight to Silver)
.withColumn("email", F.lower(F.trim(F.col("email"))))
.withColumn("customer_name", F.trim(F.col("customer_name")))
# Add Silver metadata
.withColumn("_silver_load_timestamp", F.current_timestamp())
.withColumn("_source_system", F.col("_source_system")) # carry forward
)
# COMMAND ----------
# Validate before writing
# Data quality checks — fail loudly if violated
null_count = silver_customers.filter(F.col("customer_id").isNull()).count()
if null_count > 0:
raise ValueError(f"Found {null_count} customer records with NULL customer_id")
duplicate_count = (silver_customers
.groupBy("customer_id")
.count()
.filter(F.col("count") > 1)
.count()
)
if duplicate_count > 0:
raise ValueError(f"Found {duplicate_count} duplicate customer_ids")
# Email format check (warn, don't fail)
invalid_email_count = (silver_customers
.filter(F.col("email").isNotNull())
.filter(~F.col("email").rlike(r"^[^@\s]+@[^@\s]+\.[^@\s]+$"))
.count()
)
if invalid_email_count > 0:
logger.warning(f"Found {invalid_email_count} records with invalid email format")
# COMMAND ----------
# Write Silver via MERGE (upsert)
silver_path = "abfss://lakehouse@evokedatalake.dfs.core.windows.net/silver/customer"
# Create target if not exists
spark.sql(f"""
CREATE TABLE IF NOT EXISTS silver.customer
USING DELTA
LOCATION '{silver_path}'
AS SELECT * FROM silver_customers WHERE 1=0
""")
# MERGE pattern (upsert)
from delta.tables import DeltaTable
target = DeltaTable.forPath(spark, silver_path)
(target.alias("target")
.merge(
silver_customers.alias("source"),
"target.customer_id = source.customer_id"
)
.whenMatchedUpdate(
condition="target._record_hash != source._record_hash", # only update if changed
set={col: f"source.{col}" for col in silver_customers.columns}
)
.whenNotMatchedInsert(
values={col: f"source.{col}" for col in silver_customers.columns}
)
.execute()
)
logger.info(f"Silver merge complete: {silver_customers.count()} source rows processed")
```
## Gold layer template
```python
"""
Notebook: NB_Gold_dim_customer
Purpose: Build dimensional customer table for Power BI consumption
Owner: data-engineering@evoke.com
Patterns:
- Type 2 SCD on slowly-changing attributes (address, segment)
- Surrogate keys for dimensional joins
- Business rules applied
"""
# Build dim_customer with Type 2 SCD on segment changes
# Refer to SCD template for full pattern
# Aggregations into fact tables
fact_orders_daily = (silver_orders
.groupBy(
F.col("order_date"),
F.col("customer_id"),
F.col("product_category")
)
.agg(
F.sum("order_amount").alias("total_amount"),
F.count("order_id").alias("order_count"),
F.avg("order_amount").alias("avg_order_amount")
)
)
# Write to Synapse / Fabric Warehouse via Polybase / COPY INTO
(fact_orders_daily.write
.format("com.microsoft.sqlserver.jdbc.spark")
.mode("overwrite")
.option("url", synapse_url)
.option("dbtable", "dbo.fact_orders_daily")
.option("user", get_secret("synapse_user"))
.option("password", get_secret("synapse_password"))
.option("tempDir", "abfss://staging@evokedatalake.dfs.core.windows.net/tempload")
.save()
)
```
## Standards and patterns
### Idempotency (non-negotiable)
Every notebook must be safe to re-run:
- **Bronze:** append with date partitioning; re-run replaces same partition
- **Silver:** MERGE / upsert on primary key; produces same final state
- **Gold:** transformations are deterministic functions of Silver state
Never use plain INSERT in Silver/Gold (creates duplicates on re-run).
### Partitioning strategy
- **Bronze:** partition by ingestion_date — appropriate for time-series append
- **Silver:** partition by business date column (order_date, event_date) if highly date-skewed; otherwise no partitioning under 100M rows
- **Gold:** partition aligned to query patterns (most queries filter by date → partition by date)
Don't over-partition. <1GB per partition is bad. Aim for 500MB-2GB per partition.
### Schema evolution
- **Bronze:** allow schema evolution (`mergeSchema=true`); source schemas drift
- **Silver:** explicit schema; rejects unexpected columns. Schema changes are deliberate.
- **Gold:** strict schema; downstream consumers depend on it
### Type handling
- **Strings:** trim by default; use `lower()` for case-insensitive matching keys
- **Decimals:** specify precision explicitly (NEVER use float/double for money)
- **Dates:** cast to date or timestamp explicitly; don't rely on inferSchema
- **Booleans:** standardize on `true`/`false` (not "Y"/"N", "1"/"0", "yes"/"no")
- **Nulls:** handle explicitly (`coalesce`, `fillna`); don't let nulls cascade unexpectedly
### Performance patterns
#### Reading
- **Predicate pushdown:** filter early; Delta and Parquet support it
- **Column pruning:** select only columns you need
- **Partition pruning:** filter on partition columns first
- **Avoid `inferSchema=true` in production** (extra read pass)
#### Joining
- **Broadcast small tables:** `F.broadcast(small_df)` for tables <100MB
- **Repartition for large joins:** match partition count and partition column when possible
- **Avoid skew:** if one key has 90% of rows, salt the join key
- **Sort-merge join:** default for large+large; ensure data is sorted
#### Aggregation
- **Aggregate before joining** when possible (smaller sides)
- **Use approx functions** when exact precision isn't required (`approx_count_distinct`)
- **Window functions:** explicit partitioning to avoid full shuffle
#### Writing
- **Repartition before write:** `df.repartition(N)` to control output file count
- **Optimize write:** `optimizeWrite=true` (Databricks/Fabric) auto-tunes
- **Z-order** on commonly-queried columns (Delta): `OPTIMIZE table ZORDER BY (col)`
### Logging
Every notebook logs structured events:
```python
log_event = {
"notebook": notebook_name,
"run_id": run_id,
"phase": "bronze_ingest",
"source_system": source_system,
"source_table": source_table,
"row_count": row_count,
"duration_seconds": duration,
"status": "success",
"error_message": None
}
spark.createDataFrame([log_event]).write.format("delta").mode("append").saveAsTable("meta.notebook_run_log")
```
### Error handling
```python
try:
# main logic
...
log_event["status"] = "success"
except Exception as e:
log_event["status"] = "failed"
log_event["error_message"] = str(e)
raise # re-raise so ADF marks the activity failed
finally:
log_to_metadata(log_event) # always log, success or failure
```
### Secrets handling
NEVER hardcode credentials. Use:
- **Databricks:** `dbutils.secrets.get(scope, key)`
- **Fabric / Synapse Notebooks:** Key Vault-linked services + token-based access
- **Service principals** with managed identity preferred
```python
def get_secret(key: str) -> str:
return dbutils.secrets.get(scope="evoke-data-secrets", key=key)
```
### Code organization within a notebook
Order matters:
1. Docstring header (purpose, owner, SLA)
2. Imports
3. Parameter declarations (widgets)
4. Configuration constants
5. Helper functions (defined locally)
6. Read sources
7. Transform
8. Validate (data quality checks)
9. Write target
10. Log + exit with result
### Reusable utilities
Don't duplicate boilerplate. Build a shared library:
```python
# evoke_data_utils package, imported via wheel or %run
from evoke_data_utils import (
standard_bronze_metadata, # adds standard _ingestion_*, _source_*, _run_id columns
standard_silver_metadata,
log_run_event,
get_secret,
validate_no_nulls,
validate_unique,
standard_merge,
)
```
Versioned, tested, used across all notebooks. This is the difference between
spaghetti notebooks and a maintainable codebase.
## Anti-patterns to avoid
- **`.collect()` on large DataFrames.** Pulls all data to driver; OOM on >small data.
- **`.toPandas()` on large DataFrames.** Same problem.
- **`for row in df.collect()`.** Single-threaded loop on driver.
- **`.cache()` without `.unpersist()`.** Memory leak across notebook lifecycle.
- **`inferSchema=true` in production.** Slow, unpredictable.
- **Wide transformations followed by narrow.** Wastes shuffles.
- **No validation before writing.** Garbage in, garbage out.
- **Hardcoded paths.** Use parameters and config.
- **Ignoring schema drift.** Bronze should evolve; Silver should not silently.
- **Spark on data <1GB.** Use pandas / DuckDB / SQL instead. Spark startup overhead isn't worth it.
## Output format
For each notebook, generate:
1. Complete `.py` or `.ipynb` content with all sections above
2. Sample test data setup
3. Sample assertions for unit testing the transformation
4. Documentation about parameters and outputs
Tips
- Start with the standards. Don't refactor later. Notebooks written without standards are painful to standardize.
- Build the shared utilities library first. It's the multiplier for everything else.
- Test transformations in isolation. Pure functions on DataFrames are testable; notebooks coupled to ADF aren't.
- Profile before optimizing. PySpark performance is rarely intuitive; measure.
- Watch the small-data trap. Spark for tables <1GB is overkill. Use SQL/pandas/DuckDB for those.
Common mistakes to avoid
- Treating notebooks as scripts (no structure, no documentation, no error handling)
- Writing without idempotency
- Hardcoded paths and credentials
- No validation before writes
- Using float/double for money
- Schema inference in production
.collect()/.toPandas()on large data- No logging or observability
- Copy-paste boilerplate instead of shared utilities
- Spark for data that doesn't need it