Table of Contents
- Introduction to Databricks and the Lakehouse Platform
- Apache Spark Fundamentals
- Delta Lake Deep Dive
- ELT with Apache Spark and Delta Lake
- Incremental Data Processing
- Production Pipelines with Delta Live Tables (DLT)
- Databricks Workflows and Job Orchestration
- Data Governance with Unity Catalog
- Databricks SQL
- Security and Access Control
- Performance Optimization
- Exam Tips and Practice Questions
Chapter 1: Introduction to Databricks and the Lakehouse Platform
1.1 What is Databricks?
Databricks is a unified analytics platform built on top of Apache Spark that combines data engineering, data science, machine learning, and business intelligence into one cohesive ecosystem. Founded in 2013 by the original creators of Apache Spark, Databricks has grown into an industry-leading platform that runs on all major cloud providers — AWS, Azure, and Google Cloud Platform.
At its core, Databricks provides:
- Collaborative notebooks for data exploration and development
- Managed Apache Spark clusters for distributed computing
- Delta Lake as the underlying storage layer
- MLflow for machine learning lifecycle management
- Databricks SQL for business intelligence and analytics
- Delta Live Tables for declarative ETL pipelines
- Unity Catalog for unified data governance
1.2 The Data Lakehouse Architecture
Before Databricks introduced the Lakehouse concept, organizations were forced to choose between two suboptimal architectures:
Traditional Data Warehouse
- Structured data only
- High cost for storage
- Limited scalability
- Excellent ACID compliance and performance
- Works great for SQL workloads but struggles with unstructured data
Traditional Data Lake
- Supports all data types (structured, semi-structured, unstructured)
- Low-cost storage (typically object storage like S3, ADLS, GCS)
- Highly scalable
- Poor ACID compliance
- No support for transactions
- Data quality issues
- Performance challenges
The Lakehouse
The Lakehouse paradigm brings the best of both worlds by combining:
- Low-cost cloud storage from data lakes
- ACID transactions from data warehouses
- Schema enforcement and schema evolution
- Support for BI tools directly on the lake
- Streaming and batch processing in one platform
- Openness — data stored in open formats like Parquet and JSON
The Lakehouse is implemented through Delta Lake, which sits on top of cloud object storage and provides the reliability and performance features typically found only in data warehouses.
1.3 Databricks Architecture Components
Control Plane
The control plane is managed by Databricks and includes:
- Databricks web application (UI)
- Cluster manager — manages the lifecycle of compute clusters
- Job scheduler — orchestrates workflow execution
- Notebook server — manages collaborative development
- Metadata store — stores table definitions, ACLs, and configuration
Data Plane
The data plane runs within the customer's cloud account and includes:
- Cluster compute resources (EC2 on AWS, VMs on Azure/GCP)
- Cloud object storage (S3, ADLS Gen2, GCS)
- Network resources (VPC, subnets, security groups)
This separation ensures that your data never leaves your cloud account, providing security and compliance guarantees.
1.4 Databricks Workspace
The Databricks Workspace is the collaborative environment where users interact with the platform. Key components include:
Notebooks
Notebooks are the primary development interface in Databricks. They support multiple languages:
- Python (
%python) - Scala (
%scala) - SQL (
%sql) - R (
%r) - Markdown (
%md) - Shell commands (
%sh) - File system operations (
%fs)
You can switch languages within a single notebook using magic commands, making it incredibly flexible.
Repos
Databricks Repos provides Git integration, allowing teams to:
- Connect to GitHub, GitLab, Azure DevOps, or Bitbucket
- Version control notebooks and code
- Implement CI/CD pipelines
- Collaborate across teams
Clusters
Clusters are the compute resources that execute your code. There are two primary types:
All-Purpose Clusters: Used for interactive development and collaboration. They can be shared across multiple users and notebooks. These are more expensive but provide maximum flexibility.
Job Clusters: Ephemeral clusters created specifically for a job and terminated when the job completes. These are more cost-effective for production workloads.
Cluster Modes:
- Standard Mode: Single-user or shared cluster with full Spark capabilities
- High Concurrency Mode: Optimized for concurrent usage with fine-grained resource sharing
- Single Node Mode: Driver-only cluster for lightweight workloads and local Spark
Cluster Configuration Options:
- Databricks Runtime (DBR) version
- Node types and sizes
- Autoscaling configuration
- Auto-termination settings
- Spot/Preemptible instance usage
- Custom libraries and init scripts
- Environment variables
Chapter 2: Apache Spark Fundamentals
2.1 Understanding Apache Spark
Apache Spark is the distributed computing engine that powers Databricks. Understanding Spark's core concepts is fundamental to the Data Engineer Associate exam.
Spark Architecture
Driver Node
The driver is the brain of a Spark application. It:
- Runs the main() function
- Creates the SparkContext/SparkSession
- Converts user code into execution tasks
- Schedules jobs and stages
- Communicates with the cluster manager
- Keeps track of metadata about distributed data
Executor Nodes
Executors are the worker processes that run on cluster nodes. They:
- Execute the tasks assigned by the driver
- Store data in memory or disk (RDD/DataFrame partitions)
- Return results back to the driver
- Each executor has multiple cores, each of which can run one task at a time
Cluster Manager
In Databricks, the cluster manager is handled automatically. Spark supports:
- Standalone
- YARN (Hadoop)
- Mesos
- Kubernetes
- Databricks (proprietary)
Spark Execution Model
When you submit a Spark job, it goes through the following stages:
- Job: Triggered by an action (collect(), show(), write(), etc.)
- Stage: A set of tasks that can be executed in parallel without data shuffling. Stage boundaries occur at shuffle operations.
- Task: The smallest unit of work, operating on a single partition of data
Application
└── Job (triggered by action)
└── Stage (divided by shuffles)
└── Task (one per partition)
2.2 SparkSession
The SparkSession is the entry point for all Spark functionality in modern Spark (2.0+). In Databricks, it's automatically available as spark.
# In Databricks, SparkSession is pre-created as 'spark'
# But you can access it like this:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("MyApplication") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
# Check Spark version
print(spark.version)
# Access SparkContext
sc = spark.sparkContext
print(sc.appName)
2.3 DataFrames and the DataFrame API
DataFrames are the primary data abstraction in modern Spark. A DataFrame is a distributed collection of data organized into named columns, conceptually similar to a table in a relational database.
Creating DataFrames
# From a Python list
data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)
# From a CSV file
df_csv = spark.read \
.option("header", "true") \
.option("inferSchema", "true") \
.csv("/path/to/file.csv")
# From a JSON file
df_json = spark.read.json("/path/to/file.json")
# From a Parquet file
df_parquet = spark.read.parquet("/path/to/file.parquet")
# From a Delta table
df_delta = spark.read.format("delta").load("/path/to/delta/table")
# Or using SQL
df_sql = spark.read.table("database.tablename")
# From a JDBC source
df_jdbc = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://host:5432/database") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.load()
DataFrame Transformations
Transformations are lazy — they define a computation but don't execute it until an action is called.
from pyspark.sql import functions as F
from pyspark.sql.types import *
# Select specific columns
df.select("name", "age")
df.select(F.col("name"), F.col("age"))
# Filter/Where
df.filter(F.col("age") > 25)
df.where("age > 25")
# Add new columns
df.withColumn("birth_year", 2024 - F.col("age"))
# Rename columns
df.withColumnRenamed("name", "full_name")
# Drop columns
df.drop("unnecessary_column")
# Sort/OrderBy
df.sort("age")
df.orderBy(F.col("age").desc())
# Distinct values
df.distinct()
df.dropDuplicates(["name"])
# Limit rows
df.limit(100)
# Group By and Aggregations
df.groupBy("department") \
.agg(
F.count("*").alias("employee_count"),
F.avg("salary").alias("avg_salary"),
F.max("salary").alias("max_salary"),
F.min("salary").alias("min_salary"),
F.sum("salary").alias("total_salary")
)
# Joins
df1.join(df2, df1["id"] == df2["id"], "inner")
df1.join(df2, df1["id"] == df2["id"], "left")
df1.join(df2, df1["id"] == df2["id"], "right")
df1.join(df2, df1["id"] == df2["id"], "full")
df1.join(df2, df1["id"] == df2["id"], "left_semi")
df1.join(df2, df1["id"] == df2["id"], "left_anti")
df1.join(df2, df1["id"] == df2["id"], "cross")
# Union
df1.union(df2) # Combines by position
df1.unionByName(df2) # Combines by column name
DataFrame Actions
Actions trigger computation and return results.
# Collect all rows to driver (careful with large datasets!)
rows = df.collect()
# Show first N rows
df.show(20)
df.show(20, truncate=False)
# Count rows
count = df.count()
# Get first row
first_row = df.first()
first_5 = df.take(5)
# Describe statistics
df.describe().show()
df.summary().show()
# Write to storage
df.write.parquet("/path/to/output")
df.write.format("delta").save("/path/to/delta/table")
df.write.saveAsTable("database.tablename")
2.4 Spark SQL
Spark SQL allows you to run SQL queries against DataFrames and Hive tables.
# Register DataFrame as temporary view
df.createOrReplaceTempView("employees")
# Run SQL query
result = spark.sql("""
SELECT department,
COUNT(*) as emp_count,
AVG(salary) as avg_salary
FROM employees
WHERE salary > 50000
GROUP BY department
ORDER BY avg_salary DESC
""")
# Create global temporary view (accessible across sessions)
df.createOrReplaceGlobalTempView("global_employees")
spark.sql("SELECT * FROM global_temp.global_employees")
2.5 Data Types and Schema
Understanding Spark data types is critical for data engineering.
from pyspark.sql.types import *
# Define explicit schema
schema = StructType([
StructField("id", IntegerType(), nullable=False),
StructField("name", StringType(), nullable=True),
StructField("salary", DoubleType(), nullable=True),
StructField("hire_date", DateType(), nullable=True),
StructField("is_active", BooleanType(), nullable=True),
StructField("address", StructType([
StructField("street", StringType(), nullable=True),
StructField("city", StringType(), nullable=True),
StructField("zip", StringType(), nullable=True)
]), nullable=True),
StructField("skills", ArrayType(StringType()), nullable=True),
StructField("metadata", MapType(StringType(), StringType()), nullable=True)
])
# Create DataFrame with explicit schema
df = spark.createDataFrame(data, schema)
# Check schema
df.printSchema()
df.schema
# Cast column types
df.withColumn("salary", F.col("salary").cast(LongType()))
df.withColumn("hire_date", F.col("hire_date").cast("date"))
2.6 Working with Complex Types
# Working with Arrays
df = spark.createDataFrame([
(1, ["Python", "SQL", "Spark"]),
(2, ["Java", "Scala"])
], ["id", "skills"])
# Explode array into rows
df.select("id", F.explode("skills").alias("skill"))
# Array functions
df.select(
"id",
F.size("skills").alias("skill_count"),
F.array_contains("skills", "Python").alias("knows_python"),
F.array_distinct("skills"),
F.sort_array("skills")
)
# Working with Structs
df = spark.createDataFrame([
(1, ("John", "Doe", 30)),
], ["id", "person"])
df.select("id", "person.first_name", "person.last_name")
# Working with Maps
df = spark.createDataFrame([
(1, {"key1": "value1", "key2": "value2"})
], ["id", "metadata"])
df.select(
"id",
F.map_keys("metadata").alias("keys"),
F.map_values("metadata").alias("values"),
df["metadata"]["key1"].alias("key1_value")
)
2.7 Window Functions
Window functions are essential for analytical computations.
from pyspark.sql.window import Window
# Define window specification
window_spec = Window.partitionBy("department").orderBy("salary")
# Ranking functions
df.withColumn("rank", F.rank().over(window_spec))
df.withColumn("dense_rank", F.dense_rank().over(window_spec))
df.withColumn("row_number", F.row_number().over(window_spec))
df.withColumn("percent_rank", F.percent_rank().over(window_spec))
df.withColumn("ntile", F.ntile(4).over(window_spec))
# Analytic functions
df.withColumn("lag_salary", F.lag("salary", 1).over(window_spec))
df.withColumn("lead_salary", F.lead("salary", 1).over(window_spec))
# Aggregate functions with window
window_agg = Window.partitionBy("department")
df.withColumn("dept_avg_salary", F.avg("salary").over(window_agg))
df.withColumn("dept_total_salary", F.sum("salary").over(window_agg))
# Running totals
window_running = Window.partitionBy("department") \
.orderBy("hire_date") \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("running_salary_total", F.sum("salary").over(window_running))
2.8 User-Defined Functions (UDFs)
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType
# Python UDF (not optimized by Catalyst)
def categorize_salary(salary):
if salary < 50000:
return "Low"
elif salary < 100000:
return "Medium"
else:
return "High"
# Register UDF
categorize_udf = udf(categorize_salary, StringType())
# Use UDF
df.withColumn("salary_category", categorize_udf(F.col("salary")))
# Lambda UDF
double_salary = udf(lambda x: x * 2, IntegerType())
# Pandas UDF (vectorized - much faster!)
from pyspark.sql.functions import pandas_udf
import pandas as pd
@pandas_udf(StringType())
def categorize_salary_pandas(salary: pd.Series) -> pd.Series:
return salary.apply(lambda x: "Low" if x < 50000 else "Medium" if x < 100000 else "High")
df.withColumn("salary_category", categorize_salary_pandas(F.col("salary")))
Chapter 3: Delta Lake Deep Dive
3.1 What is Delta Lake?
Delta Lake is an open-source storage layer that brings ACID transactions, scalable metadata handling, and unified streaming/batch data processing to data lakes. It was created by Databricks and donated to the Linux Foundation.
Delta Lake stores data as Parquet files along with a transaction log (Delta Log) that tracks all changes to the table.
3.2 Delta Lake Architecture
Delta Log (Transaction Log)
The Delta Log is a directory called _delta_log located at the root of the Delta table. It contains:
- JSON log files: Each transaction creates a new JSON file (00000000000000000000.json, 00000000000000000001.json, etc.)
- Checkpoint files: Every 10 transactions, Databricks creates a Parquet checkpoint file that consolidates the log for faster reads
Each log entry contains:
- Add actions: New files added to the table
- Remove actions: Files removed from the table
- Metadata actions: Schema changes, table properties
- Protocol actions: Delta protocol version upgrades
- CommitInfo actions: Information about the operation
How Delta Lake Achieves ACID
Atomicity: Either all the changes in a transaction are committed or none are. This is achieved through the atomic JSON write to the transaction log.
Consistency: Schema enforcement ensures that writes conform to the table schema. Constraint checking ensures data integrity rules.
Isolation: Optimistic concurrency control with snapshot isolation. Each transaction reads from a consistent snapshot and conflicts are detected at commit time.
Durability: Once a transaction is committed to the log, it's permanent. The underlying cloud storage provides durability guarantees.
3.3 Creating Delta Tables
# Method 1: Write DataFrame as Delta
df.write.format("delta").save("/path/to/delta/table")
# Method 2: Write with partitioning
df.write \
.format("delta") \
.partitionBy("year", "month") \
.save("/path/to/delta/table")
# Method 3: Save as table (creates metadata in metastore)
df.write.format("delta").saveAsTable("database.table_name")
# Method 4: Create table using SQL
spark.sql("""
CREATE TABLE IF NOT EXISTS sales (
id BIGINT,
customer_id BIGINT,
product_id BIGINT,
amount DOUBLE,
sale_date DATE
)
USING DELTA
PARTITIONED BY (sale_date)
LOCATION '/path/to/delta/sales'
COMMENT 'Sales transactions table'
TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
# Method 5: CREATE TABLE AS SELECT (CTAS)
spark.sql("""
CREATE TABLE new_sales
USING DELTA
AS SELECT * FROM old_sales WHERE year = 2024
""")
3.4 Reading Delta Tables
# Read Delta table from path
df = spark.read.format("delta").load("/path/to/delta/table")
# Read Delta table from catalog
df = spark.read.table("database.table_name")
# Read a specific version (Time Travel)
df_v2 = spark.read.format("delta") \
.option("versionAsOf", 2) \
.load("/path/to/delta/table")
# Read at a specific timestamp
df_ts = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-01") \
.load("/path/to/delta/table")
# Using SQL for time travel
spark.sql("""
SELECT * FROM sales VERSION AS OF 3
""")
spark.sql("""
SELECT * FROM sales TIMESTAMP AS OF '2024-01-01 00:00:00'
""")
3.5 Updating Delta Tables
INSERT Operations
# Append data (default mode)
new_data.write.mode("append").format("delta").save("/path/to/delta/table")
# Overwrite entire table
new_data.write.mode("overwrite").format("delta").save("/path/to/delta/table")
# Insert using SQL
spark.sql("""
INSERT INTO sales VALUES (1, 100, 200, 99.99, '2024-01-15')
""")
spark.sql("""
INSERT INTO sales SELECT * FROM staging_sales
""")
# Insert Overwrite (replaces data matching partition)
spark.sql("""
INSERT OVERWRITE sales
SELECT * FROM staging_sales WHERE sale_date = '2024-01-15'
""")
UPDATE Operations
from delta.tables import DeltaTable
# Load the Delta table
delta_table = DeltaTable.forPath(spark, "/path/to/delta/table")
# Update with condition
delta_table.update(
condition = F.col("customer_id") == 100,
set = {"amount": F.col("amount") * 1.1}
)
# Update using SQL
spark.sql("""
UPDATE sales
SET amount = amount * 1.1
WHERE customer_id = 100
""")
DELETE Operations
# Delete with condition
delta_table.delete(condition = F.col("sale_date") < "2020-01-01")
# Delete using SQL
spark.sql("""
DELETE FROM sales
WHERE sale_date < '2020-01-01'
""")
3.6 MERGE (UPSERT) Operations
MERGE is one of the most powerful features of Delta Lake, enabling complex upsert operations.
from delta.tables import DeltaTable
# Target table
target = DeltaTable.forPath(spark, "/path/to/delta/table")
# Source DataFrame
source = spark.read.table("staging_updates")
# Perform MERGE
target.alias("target") \
.merge(
source.alias("source"),
"target.id = source.id"
) \
.whenMatchedUpdate(set={
"amount": F.col("source.amount"),
"updated_at": F.current_timestamp()
}) \
.whenNotMatchedInsert(values={
"id": F.col("source.id"),
"customer_id": F.col("source.customer_id"),
"amount": F.col("source.amount"),
"sale_date": F.col("source.sale_date"),
"created_at": F.current_timestamp()
}) \
.whenNotMatchedBySourceDelete() \
.execute()
-- MERGE using SQL
MERGE INTO sales AS target
USING staging_updates AS source
ON target.id = source.id
WHEN MATCHED AND source.action = 'UPDATE' THEN
UPDATE SET target.amount = source.amount,
target.updated_at = current_timestamp()
WHEN MATCHED AND source.action = 'DELETE' THEN
DELETE
WHEN NOT MATCHED THEN
INSERT (id, customer_id, amount, sale_date)
VALUES (source.id, source.customer_id, source.amount, source.sale_date)
3.7 Delta Lake Schema Management
Schema Enforcement
Delta Lake enforces schema by default — if you try to write data with an incompatible schema, it will throw an error.
# This will FAIL if 'new_column' doesn't exist in the schema
new_data_with_extra_column.write \
.mode("append") \
.format("delta") \
.save("/path/to/delta/table")
# Error: A schema mismatch detected when writing to the Delta table
Schema Evolution
You can enable schema evolution to automatically add new columns.
# Enable schema evolution (mergeSchema)
new_data_with_extra_column.write \
.mode("append") \
.option("mergeSchema", "true") \
.format("delta") \
.save("/path/to/delta/table")
# Overwrite and update schema
new_data.write \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.format("delta") \
.save("/path/to/delta/table")
ALTER TABLE Operations
# Add column
spark.sql("ALTER TABLE sales ADD COLUMN discount DOUBLE")
# Rename column (requires column mapping)
spark.sql("ALTER TABLE sales RENAME COLUMN amount TO total_amount")
# Drop column (requires column mapping)
spark.sql("ALTER TABLE sales DROP COLUMN old_column")
# Change column type
spark.sql("ALTER TABLE sales ALTER COLUMN amount TYPE DECIMAL(10,2)")
# Add table comment
spark.sql("COMMENT ON TABLE sales IS 'Main sales transactions'")
# Add column comment
spark.sql("ALTER TABLE sales ALTER COLUMN amount COMMENT 'Transaction amount in USD'")
3.8 Delta Lake Time Travel
Time Travel is one of Delta Lake's most valuable features, allowing you to query historical versions of your data.
# Query version 0 (initial version)
spark.read.format("delta") \
.option("versionAsOf", 0) \
.load("/path/to/delta/table")
# Query at timestamp
spark.read.format("delta") \
.option("timestampAsOf", "2024-01-01T00:00:00.000Z") \
.load("/path/to/delta/table")
# View table history
delta_table = DeltaTable.forPath(spark, "/path/to/delta/table")
delta_table.history().show()
delta_table.history(10).show() # Last 10 operations
# SQL equivalent
spark.sql("DESCRIBE HISTORY sales").show()
spark.sql("DESCRIBE HISTORY sales LIMIT 5").show()
Use Cases for Time Travel:
- Auditing: Track changes to data over time
- Rollback: Restore data to a previous state
- Debugging: Reproduce issues by querying historical data
- Regulatory compliance: Access point-in-time data for compliance
3.9 OPTIMIZE and VACUUM
OPTIMIZE
Over time, Delta tables accumulate many small files due to frequent small writes. OPTIMIZE compacts these small files into larger, more efficient files.
# Optimize entire table
spark.sql("OPTIMIZE sales")
# Optimize with Z-ORDER (co-locate related data)
spark.sql("OPTIMIZE sales ZORDER BY (customer_id, sale_date)")
# Optimize specific partition
spark.sql("OPTIMIZE sales WHERE sale_date = '2024-01-15'")
Z-Ordering is a technique that co-locates related information in the same set of files. This improves query performance by reducing the amount of data that needs to be read. It's particularly effective for high-cardinality columns that are frequently used in filters.
VACUUM
VACUUM removes files that are no longer referenced by the Delta log and are older than the retention period.
# Vacuum with default retention (7 days)
spark.sql("VACUUM sales")
# Vacuum with custom retention (in hours)
spark.sql("VACUUM sales RETAIN 168 HOURS") # 7 days
# DRY RUN - shows files to be deleted without actually deleting
spark.sql("VACUUM sales DRY RUN")
# WARNING: This disables time travel beyond the retention period
# To vacuum with less than 7 days (use carefully!):
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
spark.sql("VACUUM sales RETAIN 0 HOURS")
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "true")
Important: You cannot time travel to versions that have been vacuumed. Default retention is 7 days (168 hours).
3.10 Delta Table Properties and Features
# View table properties
spark.sql("DESCRIBE EXTENDED sales")
spark.sql("SHOW TBLPROPERTIES sales")
# Set table properties
spark.sql("""
ALTER TABLE sales
SET TBLPROPERTIES (
'delta.logRetentionDuration' = 'interval 30 days',
'delta.deletedFileRetentionDuration' = 'interval 7 days',
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
# Enable column mapping (required for renaming/dropping columns)
spark.sql("""
ALTER TABLE sales
SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name')
""")
# Enable Change Data Feed
spark.sql("""
ALTER TABLE sales
SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
""")
3.11 Change Data Feed (CDF)
Change Data Feed captures row-level changes (inserts, updates, deletes) made to a Delta table.
# Enable CDF on table creation
spark.sql("""
CREATE TABLE sales (
id BIGINT,
amount DOUBLE,
updated_at TIMESTAMP
)
USING DELTA
TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
""")
# Read changes from a specific version
changes = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 1) \
.table("sales")
# Read changes from a specific timestamp
changes = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", "2024-01-01") \
.table("sales")
# CDF adds these special columns:
# _change_type: 'insert', 'update_preimage', 'update_postimage', 'delete'
# _commit_version: The version of the commit
# _commit_timestamp: The timestamp of the commit
changes.filter("_change_type = 'insert'").show()
changes.filter("_change_type IN ('update_preimage', 'update_postimage')").show()
3.12 Constraints
Delta Lake supports column-level and table-level constraints.
# NOT NULL constraint
spark.sql("""
ALTER TABLE sales
ALTER COLUMN id SET NOT NULL
""")
# CHECK constraint
spark.sql("""
ALTER TABLE sales
ADD CONSTRAINT valid_amount CHECK (amount > 0)
""")
# View constraints
spark.sql("DESCRIBE EXTENDED sales")
# Drop constraint
spark.sql("ALTER TABLE sales DROP CONSTRAINT valid_amount")
Chapter 4: ELT with Apache Spark and Delta Lake
4.1 ELT vs ETL
Traditional ETL (Extract, Transform, Load):
- Data is extracted from source
- Transformed outside the target system
- Loaded into the destination
Modern ELT (Extract, Load, Transform):
- Data is extracted from source
- Loaded raw into the data lake
- Transformed within the data platform
In the Databricks Lakehouse, ELT is preferred because:
- Raw data is preserved for reprocessing
- Transformations leverage distributed compute power
- Single platform reduces complexity
- Cost optimization through separation of storage and compute
4.2 Medallion Architecture
The Medallion Architecture (Bronze, Silver, Gold) is the recommended approach for organizing data in the Lakehouse.
Source Systems → [Bronze] → [Silver] → [Gold] → Analytics/ML
Bronze Layer (Raw)
- Raw data ingested as-is from source systems
- No transformations (maybe some basic typing)
- Schema may not be enforced
- Keeps historical record of all raw data
- Data is append-only
# Ingesting raw JSON data into Bronze
raw_data = spark.read \
.format("json") \
.option("multiLine", "true") \
.load("/raw/incoming/orders/*.json")
# Add ingestion metadata
bronze_data = raw_data.withColumn("_ingested_at", F.current_timestamp()) \
.withColumn("_source_file", F.input_file_name())
bronze_data.write \
.mode("append") \
.format("delta") \
.save("/bronze/orders")
Silver Layer (Cleansed)
- Data is cleaned, validated, and standardized
- Joins happen at this layer
- Schema enforcement applied
- Still at roughly the same granularity as Bronze
- Deduplication happens here
# Read from Bronze
bronze_df = spark.read.format("delta").load("/bronze/orders")
# Clean and validate
silver_df = bronze_df \
.filter(F.col("order_id").isNotNull()) \
.filter(F.col("amount") > 0) \
.dropDuplicates(["order_id"]) \
.withColumn("order_date", F.to_date("order_date", "yyyy-MM-dd")) \
.withColumn("amount", F.col("amount").cast("decimal(10,2)")) \
.withColumn("status", F.upper(F.col("status"))) \
.withColumn("_processed_at", F.current_timestamp())
# Upsert into Silver (handle late-arriving data)
from delta.tables import DeltaTable
if DeltaTable.isDeltaTable(spark, "/silver/orders"):
silver_table = DeltaTable.forPath(spark, "/silver/orders")
silver_table.alias("target") \
.merge(silver_df.alias("source"), "target.order_id = source.order_id") \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
silver_df.write.format("delta").save("/silver/orders")
Gold Layer (Business Aggregated)
- Business-level aggregations
- Optimized for specific use cases (reporting, ML)
- Pre-aggregated data for performance
- Multiple Gold tables may be derived from same Silver data
# Create Gold table: Daily Sales Summary
gold_daily_sales = spark.read.format("delta").load("/silver/orders") \
.filter("status = 'COMPLETED'") \
.groupBy(
F.to_date("order_date").alias("date"),
"product_category",
"region"
) \
.agg(
F.count("*").alias("order_count"),
F.sum("amount").alias("total_revenue"),
F.avg("amount").alias("avg_order_value"),
F.countDistinct("customer_id").alias("unique_customers")
)
gold_daily_sales.write \
.mode("overwrite") \
.format("delta") \
.partitionBy("date") \
.save("/gold/daily_sales_summary")
4.3 Common Data Transformations
String Operations
# String functions
df.select(
F.upper("name"),
F.lower("name"),
F.trim("name"),
F.ltrim("name"),
F.rtrim("name"),
F.length("name"),
F.substring("name", 1, 3),
F.concat("first_name", F.lit(" "), "last_name"),
F.concat_ws(" ", "first_name", "last_name"),
F.split("full_name", " "),
F.regexp_replace("phone", "[^0-9]", ""),
F.regexp_extract("email", "(@.*)", 1),
F.like("name", "A%"),
F.lpad("id", 10, "0"),
F.rpad("name", 20, " ")
)
Date and Time Operations
# Date/Time functions
df.select(
F.current_date(),
F.current_timestamp(),
F.to_date("date_str", "yyyy-MM-dd"),
F.to_timestamp("ts_str", "yyyy-MM-dd HH:mm:ss"),
F.date_format("date_col", "MM/dd/yyyy"),
F.year("date_col"),
F.month("date_col"),
F.dayofmonth("date_col"),
F.dayofweek("date_col"),
F.dayofyear("date_col"),
F.hour("timestamp_col"),
F.minute("timestamp_col"),
F.second("timestamp_col"),
F.date_add("date_col", 7),
F.date_sub("date_col", 7),
F.datediff("end_date", "start_date"),
F.months_between("end_date", "start_date"),
F.add_months("date_col", 3),
F.last_day("date_col"),
F.next_day("date_col", "Monday"),
F.trunc("date_col", "month"),
F.date_trunc("month", "timestamp_col"),
F.unix_timestamp("timestamp_col"),
F.from_unixtime("unix_ts"),
F.from_utc_timestamp("timestamp_col", "America/New_York")
)
Null Handling
# Null handling
df.select(
F.isnull("column"),
F.isnan("column"),
F.coalesce("col1", "col2", F.lit("default")),
F.nvl("col1", "col2"), # Returns col1 if not null, else col2
F.nvl2("col1", "col2", "col3"), # If col1 not null return col2, else col3
F.nullif("col1", "col2"), # Returns null if col1 == col2
F.ifnull("col1", "col2"), # Same as NVL
F.nanvl("col1", "col2") # Returns col1 if not NaN, else col2
)
# Fill null values
df.fillna(0, subset=["salary"])
df.fillna({"salary": 0, "name": "Unknown"})
df.na.fill(0)
df.na.drop() # Drop rows with any null
df.na.drop(how="all") # Drop rows where all values are null
df.na.drop(subset=["critical_column"])
Conditional Logic
# CASE WHEN using when/otherwise
df.withColumn("salary_tier",
F.when(F.col("salary") < 50000, "Junior")
.when(F.col("salary") < 100000, "Mid")
.when(F.col("salary") < 150000, "Senior")
.otherwise("Executive")
)
# Using expr for complex SQL expressions
df.withColumn("status",
F.expr("CASE WHEN age < 18 THEN 'minor' WHEN age < 65 THEN 'adult' ELSE 'senior' END")
)
4.4 Reading Data from Various Sources
# CSV
df = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.option("delimiter", ",") \
.option("quote", '"') \
.option("escape", "\\") \
.option("nullValue", "NULL") \
.option("dateFormat", "yyyy-MM-dd") \
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss") \
.option("multiLine", "true") \
.option("encoding", "UTF-8") \
.load("/path/to/*.csv")
# JSON
df = spark.read \
.format("json") \
.option("multiLine", "true") \
.option("inferSchema", "true") \
.option("dateFormat", "yyyy-MM-dd") \
.load("/path/to/*.json")
# Parquet
df = spark.read.parquet("/path/to/parquet/")
# Avro
df = spark.read.format("avro").load("/path/to/avro/")
# ORC
df = spark.read.orc("/path/to/orc/")
# Text files
df = spark.read.text("/path/to/text/")
df = spark.read.format("text").load("/path/to/text/*.txt")
4.5 Writing Data to Various Formats
# Write modes
# "append" - Append to existing data
# "overwrite" - Overwrite all existing data
# "error"/"errorifexists" - Throw error if data exists (default)
# "ignore" - Silently skip write if data exists
# Write as Parquet
df.write \
.mode("overwrite") \
.partitionBy("year", "month") \
.parquet("/path/to/output/")
# Write as CSV
df.write \
.mode("overwrite") \
.option("header", "true") \
.csv("/path/to/output.csv")
# Write as Delta
df.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.partitionBy("date") \
.save("/path/to/delta/table")
# Coalesce before writing (reduce output files)
df.coalesce(1).write.mode("overwrite").csv("/path/to/single/file.csv")
# Repartition for balanced output
df.repartition(10).write.format("delta").save("/path/to/delta/")
Chapter 5: Incremental Data Processing
5.1 What is Incremental Processing?
Incremental processing is the practice of processing only new or changed data, rather than reprocessing the entire dataset each time. This is critical for production data pipelines because:
- Reduces compute costs
- Minimizes processing time
- Enables near-real-time data freshness
- Scales to large datasets efficiently
5.2 Structured Streaming
Structured Streaming is Spark's streaming engine built on top of the DataFrame/Dataset API. It treats a streaming data source as an unbounded table that grows continuously.
Key Concepts
Micro-batch Processing: Processes data in small batches at regular intervals (default 500ms or when data is available).
Continuous Processing: For ultra-low latency (millisecond range), though less common.
Trigger Types:
-
processingTime="0 seconds"- Process as fast as possible -
processingTime="1 minute"- Fixed interval -
once=True- Process all available data, then stop (batch mode) -
availableNow=True- Process all available data across micro-batches, then stop
Input Sources
# Kafka source
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "topic1,topic2") \
.option("startingOffsets", "earliest") \
.option("maxOffsetsPerTrigger", 10000) \
.load()
# Parse Kafka value
from pyspark.sql.types import *
schema = StructType([
StructField("id", IntegerType()),
StructField("value", StringType())
])
parsed_df = kafka_df.select(
F.from_json(F.col("value").cast("string"), schema).alias("data")
).select("data.*")
# Auto Loader (cloud file source)
autoloader_df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.inferColumnTypes", "true") \
.option("cloudFiles.schemaLocation", "/path/to/schema/") \
.load("/path/to/landing/zone/")
# Delta table as stream source
delta_stream = spark.readStream \
.format("delta") \
.option("maxFilesPerTrigger", 100) \
.table("bronze.orders")
# Rate source (for testing)
test_stream = spark.readStream \
.format("rate") \
.option("rowsPerSecond", 100) \
.load()
Output Sinks and Output Modes
Output Modes:
- append: Only new rows added since last trigger are written (default for streaming sources)
- complete: The entire result table is written (required for aggregations without watermark)
- update: Only rows that were updated since last trigger are written
# Write to Delta (most common)
query = df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint/") \
.trigger(processingTime="1 minute") \
.start("/path/to/output/delta/")
# Write to Delta table by name
query = df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint/") \
.toTable("silver.orders")
# Write to Kafka
query = df.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("topic", "output-topic") \
.option("checkpointLocation", "/path/to/checkpoint/") \
.start()
# Write to memory (for debugging/testing)
query = df.writeStream \
.format("memory") \
.queryName("temp_table") \
.outputMode("complete") \
.start()
spark.sql("SELECT * FROM temp_table").show()
# Console (for debugging)
query = df.writeStream \
.format("console") \
.outputMode("append") \
.start()
# Manage streaming query
query.status # Current status
query.isActive # True if running
query.stop() # Stop the query
query.awaitTermination() # Block until complete
query.awaitTermination(timeout=60) # Block with timeout
Checkpointing
Checkpointing is critical for fault tolerance in streaming. It stores:
- The current offset/position in the source
- The state of aggregations
- Metadata about the query
# Always specify a unique checkpoint location per query
query = df.writeStream \
.option("checkpointLocation", "/checkpoint/orders_to_silver/") \
.format("delta") \
.start("/silver/orders/")
5.3 Auto Loader
Auto Loader is Databricks' optimized solution for incrementally loading files from cloud storage. It's the recommended approach for Bronze layer ingestion.
# Basic Auto Loader setup
df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.option("cloudFiles.schemaLocation", "/schema/orders/") \
.load("/landing/orders/")
# Write to Bronze Delta table
query = df.writeStream \
.format("delta") \
.option("checkpointLocation", "/checkpoint/orders_landing/") \
.option("mergeSchema", "true") \
.trigger(availableNow=True) \
.toTable("bronze.orders")
query.awaitTermination()
Auto Loader Benefits:
- Automatic schema detection and evolution: Infers schema and evolves it as new files arrive
- File discovery: Uses cloud notification (preferred) or directory listing
- Exactly-once processing: Uses checkpointing to ensure no file is processed twice
- Handles large number of files: More efficient than manual file tracking
Configuration Options:
df = spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "csv") \
.option("cloudFiles.schemaLocation", "/schema/events/") \
.option("cloudFiles.inferColumnTypes", "true") \
.option("cloudFiles.backfillInterval", "1 day") \
.option("cloudFiles.maxBytesPerTrigger", "10g") \
.option("cloudFiles.maxFilesPerTrigger", 1000) \
.option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
.option("header", "true") \
.load("/landing/events/")
5.4 Watermarking in Streaming
Watermarks handle late-arriving data in streaming aggregations.
# Define watermark (tolerate up to 10 minutes of late data)
windowed_counts = df \
.withWatermark("event_time", "10 minutes") \
.groupBy(
F.window("event_time", "5 minutes"),
"user_id"
) \
.count()
# Sliding window
sliding_window = df \
.withWatermark("event_time", "1 hour") \
.groupBy(
F.window("event_time", "1 hour", "15 minutes"), # 1-hour window, 15-min slide
"product_id"
) \
.agg(F.sum("amount").alias("total"))
5.5 Incremental Processing with COPY INTO
COPY INTO is a SQL command that incrementally loads files into a Delta table. Files are only loaded once (it tracks what's already been loaded).
-- Basic COPY INTO
COPY INTO sales
FROM '/path/to/source/'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header' = 'true', 'inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
-- COPY INTO with JSON
COPY INTO events
FROM '/landing/events/'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')
-- COPY INTO with explicit schema
COPY INTO sales
FROM (
SELECT
cast(id as BIGINT) as id,
cast(amount as DECIMAL(10,2)) as amount,
to_date(sale_date, 'yyyy-MM-dd') as sale_date
FROM read_files('/landing/sales/', format => 'csv', header => true)
)
COPY INTO vs Auto Loader:
| Feature | COPY INTO | Auto Loader |
|---------|-----------|-------------|
| Interface | SQL | Python/Scala |
| Scalability | Millions of files | Billions of files |
| File tracking | Within Delta log | Checkpoint files |
| Schema evolution | Manual | Automatic |
| Streaming support | No | Yes |
| Use case | Small-medium datasets | Large-scale ingestion |
Chapter 6: Production Pipelines with Delta Live Tables (DLT)
6.1 Introduction to Delta Live Tables
Delta Live Tables (DLT) is a declarative framework for building reliable, maintainable, and testable data pipelines on Databricks. Instead of writing imperative code to manage pipeline execution, you declare the data transformations and DLT handles:
- Pipeline orchestration and execution ordering
- Automatic error handling and retries
- Data quality enforcement
- Monitoring and observability
- Infrastructure management
6.2 DLT Pipeline Components
Datasets
DLT has three types of datasets:
- Streaming Tables: For append-only streaming sources
- Materialized Views: For aggregations and transformations that need to be persisted
- Views: For temporary transformations (not stored)
Pipeline Types
- Triggered: Run once and stop (like a scheduled batch job)
- Continuous: Run continuously to minimize latency
6.3 Defining DLT Pipelines
import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import *
# === BRONZE LAYER ===
# Create a streaming table from Auto Loader
@dlt.table(
name="raw_orders",
comment="Raw orders data ingested from landing zone",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "true"
}
)
def raw_orders():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaLocation",
"/pipelines/schemas/raw_orders/")
.load("/landing/orders/")
)
# === SILVER LAYER ===
# Create a streaming table with expectations (data quality)
@dlt.table(
name="clean_orders",
comment="Cleaned and validated orders",
table_properties={"quality": "silver"}
)
@dlt.expect("valid_order_id", "order_id IS NOT NULL")
@dlt.expect_or_drop("positive_amount", "amount > 0")
@dlt.expect_or_fail("valid_status", "status IN ('PENDING', 'PROCESSING', 'COMPLETED', 'CANCELLED')")
def clean_orders():
return (
dlt.read_stream("raw_orders")
.filter(F.col("order_id").isNotNull())
.withColumn("order_date", F.to_date("order_date"))
.withColumn("amount", F.col("amount").cast("decimal(10,2)"))
.withColumn("status", F.upper("status"))
.withColumn("_processed_at", F.current_timestamp())
)
# === GOLD LAYER ===
# Create a Materialized View for aggregations
@dlt.table(
name="daily_sales_summary",
comment="Daily sales aggregated by product category",
table_properties={"quality": "gold"}
)
def daily_sales_summary():
return (
dlt.read("clean_orders")
.filter("status = 'COMPLETED'")
.groupBy(
F.to_date("order_date").alias("date"),
"product_category"
)
.agg(
F.count("*").alias("order_count"),
F.sum("amount").alias("total_revenue"),
F.avg("amount").alias("avg_order_value")
)
)
6.4 DLT Expectations (Data Quality)
Expectations define data quality rules in DLT pipelines.
# @dlt.expect: Tracks violations but doesn't drop or fail
@dlt.expect("valid_email", "email LIKE '%@%.%'")
def my_table():
...
# @dlt.expect_or_drop: Drops rows that violate the constraint
@dlt.expect_or_drop("non_null_id", "id IS NOT NULL")
def my_table():
...
# @dlt.expect_or_fail: Fails the pipeline if any row violates
@dlt.expect_or_fail("critical_check", "amount >= 0")
def my_table():
...
# Multiple expectations
@dlt.expect("valid_order_id", "order_id IS NOT NULL")
@dlt.expect_or_drop("positive_amount", "amount > 0")
@dlt.expect_or_fail("valid_customer", "customer_id IS NOT NULL")
def my_table():
...
# expect_all: Apply multiple constraints at once
@dlt.expect_all({
"valid_id": "id IS NOT NULL",
"valid_amount": "amount > 0",
"valid_date": "order_date >= '2020-01-01'"
})
def my_table():
...
# expect_all_or_drop
@dlt.expect_all_or_drop({
"valid_id": "id IS NOT NULL",
"valid_amount": "amount > 0"
})
def my_table():
...
# expect_all_or_fail
@dlt.expect_all_or_fail({
"critical_id": "id IS NOT NULL",
"critical_amount": "amount > 0"
})
def my_table():
...
6.5 DLT with SQL
DLT also supports SQL syntax:
-- Bronze: Streaming table from Auto Loader
CREATE OR REFRESH STREAMING TABLE raw_customers
COMMENT "Raw customer data from source systems"
AS SELECT * FROM cloud_files("/landing/customers/", "json",
map("cloudFiles.inferColumnTypes", "true"))
-- Silver: Streaming table with expectations
CREATE OR REFRESH STREAMING TABLE clean_customers (
CONSTRAINT valid_customer_id EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE,
CONSTRAINT valid_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW,
CONSTRAINT valid_age EXPECT (age BETWEEN 18 AND 120) ON VIOLATION WARN
)
COMMENT "Cleaned customer data"
AS
SELECT
customer_id,
upper(trim(first_name)) as first_name,
upper(trim(last_name)) as last_name,
lower(email) as email,
cast(age as INT) as age,
to_date(signup_date, 'yyyy-MM-dd') as signup_date,
current_timestamp() as _processed_at
FROM STREAM(LIVE.raw_customers)
WHERE customer_id IS NOT NULL
-- Gold: Materialized view
CREATE OR REFRESH MATERIALIZED VIEW customer_summary
COMMENT "Customer segment summary"
AS
SELECT
CASE
WHEN age < 25 THEN 'Gen Z'
WHEN age < 40 THEN 'Millennial'
WHEN age < 55 THEN 'Gen X'
ELSE 'Boomer'
END as age_segment,
count(*) as customer_count,
avg(age) as avg_age
FROM LIVE.clean_customers
GROUP BY 1
6.6 DLT Pipeline Configuration
{
"name": "Orders Pipeline",
"storage": "/pipelines/orders/",
"target": "orders_db",
"libraries": [
{"notebook": {"path": "/pipelines/bronze/raw_orders"}},
{"notebook": {"path": "/pipelines/silver/clean_orders"}},
{"notebook": {"path": "/pipelines/gold/order_summaries"}}
],
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 5
}
}
],
"development": false,
"photon": true,
"channel": "CURRENT",
"edition": "ADVANCED",
"continuous": false
}
6.7 DLT APPLY CHANGES (CDC)
DLT supports Change Data Capture (CDC) through the APPLY CHANGES INTO syntax.
import dlt
from pyspark.sql import functions as F
# Source CDC stream
@dlt.view
def customers_cdc():
return spark.readStream \
.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.load("/cdc/customers/")
# Apply changes to target table
dlt.create_streaming_table("customers")
dlt.apply_changes(
target = "customers",
source = "customers_cdc",
keys = ["customer_id"],
sequence_by = F.col("updated_at"), # Sequence field to handle ordering
apply_as_deletes = F.expr("operation = 'DELETE'"),
apply_as_truncates = F.expr("operation = 'TRUNCATE'"),
except_column_list = ["operation", "updated_at"], # Columns to exclude
stored_as_scd_type = 1 # SCD Type 1 (overwrite) or 2 (history)
)
SCD Type 2 with APPLY CHANGES:
dlt.apply_changes(
target = "customers_history",
source = "customers_cdc",
keys = ["customer_id"],
sequence_by = F.col("updated_at"),
stored_as_scd_type = 2, # Maintain full history
track_history_column_list = ["email", "address", "phone"] # Only track specific columns
)
Chapter 7: Databricks Workflows and Job Orchestration
7.1 Introduction to Databricks Workflows
Databricks Workflows (formerly Jobs) is the built-in orchestration service for scheduling and running data pipelines. It's used to:
- Schedule notebooks, Python scripts, JARs, and DLT pipelines
- Create complex multi-task dependencies (DAGs)
- Monitor execution and receive alerts
- Implement retry logic and error handling
7.2 Creating and Configuring Jobs
Single Task Jobs
# Using the Databricks SDK
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs
client = WorkspaceClient()
# Create a simple notebook job
job = client.jobs.create(
name="My Data Pipeline",
tasks=[
jobs.Task(
task_key="ingest_data",
notebook_task=jobs.NotebookTask(
notebook_path="/pipelines/ingestion/load_bronze",
base_parameters={"env": "prod", "date": "{{ds}}"}
),
new_cluster=jobs.ClusterSpec(
spark_version="13.3.x-scala2.12",
node_type_id="i3.xlarge",
num_workers=2
),
timeout_seconds=3600,
max_retries=2,
min_retry_interval_millis=60000 # 1 minute between retries
)
],
schedule=jobs.CronSchedule(
quartz_cron_expression="0 0 8 * * ?", # Daily at 8 AM
timezone_id="America/New_York",
pause_status=jobs.PauseStatus.UNPAUSED
),
email_notifications=jobs.JobEmailNotifications(
on_failure=["data-team@company.com"],
on_success=["data-team@company.com"]
)
)
Multi-Task Jobs (DAGs)
{
"name": "ETL Pipeline",
"tasks": [
{
"task_key": "ingest_bronze",
"notebook_task": {
"notebook_path": "/pipelines/bronze/ingest"
},
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 2
}
},
{
"task_key": "transform_silver",
"depends_on": [{"task_key": "ingest_bronze"}],
"notebook_task": {
"notebook_path": "/pipelines/silver/transform"
},
"existing_cluster_id": "{{job_cluster_id}}"
},
{
"task_key": "aggregate_gold_sales",
"depends_on": [{"task_key": "transform_silver"}],
"notebook_task": {
"notebook_path": "/pipelines/gold/sales_agg"
}
},
{
"task_key": "aggregate_gold_customers",
"depends_on": [{"task_key": "transform_silver"}],
"notebook_task": {
"notebook_path": "/pipelines/gold/customer_agg"
}
},
{
"task_key": "update_reporting",
"depends_on": [
{"task_key": "aggregate_gold_sales"},
{"task_key": "aggregate_gold_customers"}
],
"notebook_task": {
"notebook_path": "/pipelines/reporting/refresh"
}
}
]
}
7.3 Task Types
# Notebook task
notebook_task = jobs.NotebookTask(
notebook_path="/path/to/notebook",
base_parameters={"param1": "value1"}
)
# Python Script task
python_task = jobs.SparkPythonTask(
python_file="/path/to/script.py",
parameters=["--env", "prod"]
)
# JAR task
jar_task = jobs.SparkJarTask(
main_class_name="com.company.MainClass",
jar_uri="dbfs:/jars/my_app.jar",
parameters=["arg1", "arg2"]
)
# DLT Pipeline task
dlt_task = jobs.PipelineTask(
pipeline_id="pipeline-id-here"
)
# SQL task
sql_task = jobs.SqlTask(
query=jobs.SqlTaskQuery(query_id="query-id"),
warehouse_id="warehouse-id"
)
# Python Wheel task
wheel_task = jobs.PythonWheelTask(
package_name="my_package",
entry_point="main",
named_parameters={"env": "prod"}
)
7.4 Job Parameters and Dynamic Values
# In notebooks, access job parameters using dbutils
dbutils.widgets.text("env", "dev")
dbutils.widgets.text("run_date", "")
env = dbutils.widgets.get("env")
run_date = dbutils.widgets.get("run_date")
# Dynamic value syntax in job configuration
# {{run_id}} - Current run ID
# {{job_id}} - Job ID
# {{task_key}} - Task key
# {{start_time}} - Start time
# {{parent_run_id}} - Parent run ID
# Example: Pass current date
{
"base_parameters": {
"date": "{{start_date}}",
"run_id": "{{run_id}}"
}
}
7.5 Cluster Policies
Cluster policies allow administrators to control what users can configure when creating clusters.
{
"name": "Standard Data Engineering Policy",
"definition": {
"spark_version": {
"type": "allowlist",
"values": ["13.3.x-scala2.12", "12.2.x-scala2.12"]
},
"node_type_id": {
"type": "allowlist",
"values": ["i3.xlarge", "i3.2xlarge", "i3.4xlarge"]
},
"num_workers": {
"type": "range",
"minValue": 1,
"maxValue": 10,
"defaultValue": 2
},
"autotermination_minutes": {
"type": "fixed",
"value": 60
},
"spark_conf.spark.databricks.delta.preview.enabled": {
"type": "fixed",
"value": "true"
}
}
}
7.6 dbutils — Databricks Utilities
dbutils is a powerful utility library available in Databricks notebooks.
# File System utilities
dbutils.fs.ls("/path/to/directory") # List files
dbutils.fs.mkdirs("/path/to/new/dir") # Create directory
dbutils.fs.cp("/source/file", "/dest/file") # Copy file
dbutils.fs.mv("/source/file", "/dest/file") # Move file
dbutils.fs.rm("/path/to/file") # Remove file
dbutils.fs.rm("/path/to/dir", recurse=True) # Remove directory
dbutils.fs.head("/path/to/file", 1024) # Read first N bytes
dbutils.fs.put("/path/to/file", "content") # Write content to file
# Widgets
dbutils.widgets.text("parameter", "default_value", "Label")
dbutils.widgets.dropdown("env", "dev", ["dev", "test", "prod"])
dbutils.widgets.combobox("region", "us-east", ["us-east", "eu-west"])
dbutils.widgets.multiselect("tables", "orders", ["orders", "customers", "products"])
dbutils.widgets.get("parameter") # Get widget value
dbutils.widgets.remove("parameter") # Remove widget
dbutils.widgets.removeAll() # Remove all widgets
# Secrets
dbutils.secrets.list("scope-name") # List secrets in scope
dbutils.secrets.get("scope-name", "secret-key") # Get secret value
# Notebook utilities
result = dbutils.notebook.run("/path/to/notebook", 60, {"param": "value"})
dbutils.notebook.exit("success") # Exit with message
# Library utilities
dbutils.library.installPyPI("pandas", version="1.5.0")
dbutils.library.restartPython()
Chapter 8: Data Governance with Unity Catalog
8.1 Introduction to Unity Catalog
Unity Catalog is Databricks' unified governance solution that provides centralized access control, auditing, lineage, and data discovery across all workspaces in an account.
Key capabilities:
- Unified data governance: Single control plane for all data assets
- Fine-grained access control: Table, column, and row-level security
- Data lineage: Automatic lineage tracking at the column level
- Audit logging: Comprehensive audit trail of all data access
- Data discovery: Built-in search and tagging
8.2 Unity Catalog Object Model
Account
└── Metastore (one per region)
└── Catalog
└── Schema (Database)
├── Tables
│ ├── Managed Tables
│ └── External Tables
├── Views
├── Functions
├── Volumes
└── Models (MLflow)
Three-Level Namespace
-- Full path syntax: catalog.schema.table
SELECT * FROM my_catalog.my_schema.my_table
-- Set default catalog and schema
USE CATALOG my_catalog;
USE SCHEMA my_schema;
-- After setting defaults, use two-level namespace
SELECT * FROM my_table
8.3 Creating and Managing Objects
-- Create catalog
CREATE CATALOG IF NOT EXISTS production
COMMENT 'Production data catalog';
-- Create schema
CREATE SCHEMA IF NOT EXISTS production.sales
COMMENT 'Sales data schema'
MANAGED LOCATION 'abfss://container@storage.dfs.core.windows.net/production/sales/';
-- Create managed table
CREATE TABLE IF NOT EXISTS production.sales.orders (
order_id BIGINT NOT NULL,
customer_id BIGINT,
amount DECIMAL(10,2),
order_date DATE,
status STRING
)
USING DELTA
COMMENT 'Main orders table';
-- Create external table
CREATE TABLE IF NOT EXISTS production.sales.external_data
USING DELTA
LOCATION 'abfss://container@storage.dfs.core.windows.net/external/data/'
COMMENT 'External data table';
-- Create view
CREATE OR REPLACE VIEW production.sales.active_orders AS
SELECT * FROM production.sales.orders
WHERE status = 'ACTIVE';
-- Create function
CREATE OR REPLACE FUNCTION production.sales.calculate_tax(amount DOUBLE, rate DOUBLE)
RETURNS DOUBLE
RETURN amount * rate;
8.4 Access Control
Unity Catalog uses a hierarchical permission model.
-- Grant permissions on catalog
GRANT USAGE ON CATALOG production TO `data_engineers`;
-- Grant permissions on schema
GRANT USAGE, CREATE ON SCHEMA production.sales TO `data_engineers`;
-- Grant permissions on table
GRANT SELECT, MODIFY ON TABLE production.sales.orders TO `analysts_group`;
GRANT SELECT ON TABLE production.sales.orders TO `junior_analysts`;
-- Grant all privileges
GRANT ALL PRIVILEGES ON TABLE production.sales.orders TO `admin_group`;
-- Revoke permissions
REVOKE MODIFY ON TABLE production.sales.orders FROM `junior_analysts`;
-- Show grants
SHOW GRANTS ON TABLE production.sales.orders;
SHOW GRANTS TO `data_engineers`;
-- Row-level security using row filters
CREATE OR REPLACE FUNCTION production.sales.region_filter(region STRING)
RETURNS BOOLEAN
RETURN is_member('admin') OR region = current_user();
ALTER TABLE production.sales.orders
SET ROW FILTER production.sales.region_filter ON (region);
-- Column-level security using column masks
CREATE OR REPLACE FUNCTION production.sales.mask_ssn(ssn STRING)
RETURNS STRING
RETURN CASE
WHEN is_member('hr_team') THEN ssn
ELSE CONCAT('***-**-', RIGHT(ssn, 4))
END;
ALTER TABLE production.customers
ALTER COLUMN ssn SET MASK production.sales.mask_ssn;
8.5 Unity Catalog Privileges Reference
| Privilege | Applies To | Description |
|---|---|---|
CREATE CATALOG |
Metastore | Create new catalogs |
USE CATALOG |
Catalog | Access catalog objects |
CREATE SCHEMA |
Catalog | Create schemas in catalog |
USE SCHEMA |
Schema | Access schema objects |
CREATE TABLE |
Schema | Create tables in schema |
SELECT |
Table/View | Read data |
MODIFY |
Table | Insert, update, delete |
READ FILES |
Volume | Read files from volume |
WRITE FILES |
Volume | Write files to volume |
EXECUTE |
Function | Call functions |
ALL PRIVILEGES |
Any | All applicable privileges |
8.6 Data Lineage
Unity Catalog automatically captures data lineage.
# Lineage is automatically captured for:
# - Table reads and writes
# - SQL queries
# - DLT pipelines
# - Notebooks
# - Jobs
# View lineage through the Catalog Explorer UI
# Or query system tables
SELECT * FROM system.access.table_lineage
WHERE target_table_full_name = 'production.sales.orders'
LIMIT 100;
8.7 Volumes
Volumes are Unity Catalog objects that manage access to non-tabular data in cloud storage.
-- Create external volume
CREATE EXTERNAL VOLUME production.raw_data.landing_zone
LOCATION 'abfss://container@storage.dfs.core.windows.net/landing/'
COMMENT 'Landing zone for raw data files';
-- Create managed volume
CREATE VOLUME production.raw_data.processed_files
COMMENT 'Processed file storage';
-- Access volume using /Volumes path
-- /Volumes/<catalog>/<schema>/<volume>/<path>
# Read from volume
df = spark.read.format("json").load("/Volumes/production/raw_data/landing_zone/orders/")
# Write to volume
df.write.format("csv").save("/Volumes/production/raw_data/processed_files/output/")
# Using dbutils with volumes
dbutils.fs.ls("/Volumes/production/raw_data/landing_zone/")
8.8 System Tables
Unity Catalog provides system tables for audit logging and access tracking.
-- Audit logs
SELECT * FROM system.access.audit
WHERE event_date >= current_date() - 7
AND action_name = 'SELECT'
LIMIT 100;
-- Table access history
SELECT * FROM system.access.table_access_history
WHERE table_full_name = 'production.sales.orders'
ORDER BY access_date DESC
LIMIT 100;
-- Billing and usage
SELECT * FROM system.billing.usage
WHERE usage_date >= current_date() - 30;
-- Query history
SELECT * FROM system.query.history
WHERE start_time >= current_timestamp() - INTERVAL 1 DAY
ORDER BY start_time DESC;
Chapter 9: Databricks SQL
9.1 Overview of Databricks SQL
Databricks SQL is a serverless data warehouse built on top of the Databricks Lakehouse Platform. It provides:
- SQL Editor: Web-based SQL interface
- SQL Warehouses: Scalable compute for SQL workloads
- Dashboards: Built-in visualization and reporting
- Alerts: Notifications based on query results
- Query History: Track and optimize queries
9.2 SQL Warehouses
SQL Warehouses (formerly SQL Endpoints) are the compute resources for Databricks SQL.
Types:
- Classic: Standard SQL warehouse
- Serverless: Databricks-managed infrastructure (fastest startup, pay-per-use)
- Pro: Enhanced features including Unity Catalog support
Auto-scaling: SQL warehouses automatically scale to handle concurrent queries.
9.3 Advanced SQL Features
Common Table Expressions (CTEs)
-- Basic CTE
WITH customer_orders AS (
SELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_spent
FROM orders
WHERE status = 'COMPLETED'
GROUP BY customer_id
),
customer_segments AS (
SELECT
customer_id,
CASE
WHEN total_spent > 10000 THEN 'Platinum'
WHEN total_spent > 5000 THEN 'Gold'
WHEN total_spent > 1000 THEN 'Silver'
ELSE 'Bronze'
END as segment
FROM customer_orders
)
SELECT
c.customer_id,
c.name,
cs.segment,
co.total_spent
FROM customers c
JOIN customer_orders co ON c.customer_id = co.customer_id
JOIN customer_segments cs ON c.customer_id = cs.customer_id;
-- Recursive CTE (for hierarchical data)
WITH RECURSIVE employee_hierarchy AS (
-- Base case
SELECT employee_id, name, manager_id, 0 AS level
FROM employees
WHERE manager_id IS NULL
UNION ALL
-- Recursive case
SELECT e.employee_id, e.name, e.manager_id, h.level + 1
FROM employees e
JOIN employee_hierarchy h ON e.manager_id = h.employee_id
)
SELECT * FROM employee_hierarchy ORDER BY level, employee_id;
Window Functions
-- Running total
SELECT
date,
amount,
SUM(amount) OVER (ORDER BY date ROWS UNBOUNDED PRECEDING) as running_total
FROM daily_sales;
-- Moving average
SELECT
date,
amount,
AVG(amount) OVER (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) as 7_day_avg
FROM daily_sales;
-- Ranking with ties
SELECT
product_id,
revenue,
RANK() OVER (ORDER BY revenue DESC) as rank,
DENSE_RANK() OVER (ORDER BY revenue DESC) as dense_rank,
ROW_NUMBER() OVER (ORDER BY revenue DESC) as row_num,
NTILE(4) OVER (ORDER BY revenue DESC) as quartile,
PERCENT_RANK() OVER (ORDER BY revenue DESC) as pct_rank,
CUME_DIST() OVER (ORDER BY revenue DESC) as cum_dist
FROM product_sales;
-- First/Last value
SELECT
customer_id,
order_date,
amount,
FIRST_VALUE(amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
) as first_order_amount,
LAST_VALUE(amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
) as last_order_amount
FROM orders;
PIVOT and UNPIVOT
-- PIVOT
SELECT * FROM (
SELECT product_category, month, revenue
FROM monthly_sales
)
PIVOT (
SUM(revenue)
FOR month IN ('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun')
);
-- UNPIVOT
SELECT product_category, month, revenue
FROM monthly_sales_wide
UNPIVOT (
revenue FOR month IN (jan_revenue, feb_revenue, mar_revenue)
);
Advanced Aggregations
-- GROUPING SETS
SELECT
COALESCE(category, 'ALL CATEGORIES') as category,
COALESCE(region, 'ALL REGIONS') as region,
SUM(sales) as total_sales
FROM sales_data
GROUP BY GROUPING SETS (
(category, region),
(category),
(region),
()
);
-- ROLLUP
SELECT
year, quarter, month,
SUM(revenue) as total_revenue
FROM sales
GROUP BY ROLLUP (year, quarter, month);
-- CUBE
SELECT
category, region, channel,
SUM(sales) as total_sales
FROM sales_data
GROUP BY CUBE (category, region, channel);
-- FILTER clause
SELECT
category,
SUM(amount) as total,
SUM(amount) FILTER (WHERE status = 'COMPLETED') as completed_total,
SUM(amount) FILTER (WHERE status = 'PENDING') as pending_total,
COUNT(*) FILTER (WHERE amount > 1000) as large_orders
FROM orders
GROUP BY category;
Higher-Order Functions
-- TRANSFORM: Apply function to each element
SELECT TRANSFORM(skills, s -> UPPER(s)) as upper_skills
FROM employees;
-- FILTER: Filter array elements
SELECT FILTER(scores, s -> s > 70) as passing_scores
FROM students;
-- AGGREGATE: Aggregate array elements
SELECT AGGREGATE(amounts, 0, (acc, x) -> acc + x) as total
FROM orders_with_arrays;
-- EXISTS: Check if any element matches
SELECT EXISTS(items, x -> x.price > 100) as has_expensive_items
FROM shopping_carts;
-- FORALL: Check if all elements match
SELECT FORALL(scores, s -> s >= 60) as all_passing
FROM students;
-- REDUCE (alias for AGGREGATE)
SELECT REDUCE(amounts, 0, (acc, x) -> acc + x) as sum
FROM arrays_table;
Chapter 10: Security and Access Control
10.1 Databricks Security Model
Databricks provides multiple layers of security:
- Network Security: VPC/VNET peering, Private Link, IP Access Lists
- Authentication: SSO, SCIM provisioning, Service Principals, PAT tokens
- Authorization: Unity Catalog, Table ACLs, Workspace ACLs
- Encryption: At-rest and in-transit encryption
- Audit: Comprehensive audit logging
10.2 Identity and Access Management
Users and Groups
# Using Databricks SDK
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import iam
client = WorkspaceClient()
# Create service principal
sp = client.service_principals.create(
display_name="my-service-principal",
application_id="app-id"
)
# Add user to group
client.groups.patch(
group_id="group-id",
operations=[
iam.Patch(
op=iam.PatchOp.ADD,
path="members",
value=[{"value": "user-id"}]
)
]
)
Service Principals
Service principals are non-human identities used for automated jobs and service-to-service authentication.
# Generate token for service principal
# Configure in the Databricks CLI
databricks auth login --host https://workspace.azuredatabricks.net
# Use service principal in jobs
# Set environment variables
import os
os.environ["DATABRICKS_HOST"] = "https://workspace.azuredatabricks.net"
os.environ["DATABRICKS_TOKEN"] = dbutils.secrets.get("scope", "sp-token")
10.3 Secrets Management
# Create secret scope (using CLI)
# databricks secrets create-scope --scope my-scope
# Create secret (using CLI)
# databricks secrets put --scope my-scope --key my-key
# Access secrets in notebooks
password = dbutils.secrets.get(scope="my-scope", key="db-password")
# Use in configuration (value is never shown in plain text)
jdbc_url = f"jdbc:postgresql://host:5432/db"
connection_properties = {
"user": dbutils.secrets.get("my-scope", "db-user"),
"password": dbutils.secrets.get("my-scope", "db-password")
}
df = spark.read.jdbc(url=jdbc_url, table="my_table", properties=connection_properties)
# List secrets (names only, not values)
dbutils.secrets.list("my-scope")
10.4 Table ACLs (Legacy - Pre-Unity Catalog)
-- Grant table permissions (Hive metastore)
GRANT SELECT ON TABLE my_table TO `user@company.com`;
GRANT MODIFY ON TABLE my_table TO `data_engineers`;
GRANT ALL PRIVILEGES ON SCHEMA my_schema TO `schema_owner`;
-- Deny permissions
DENY SELECT ON TABLE sensitive_table TO `junior_analysts`;
-- Revoke permissions
REVOKE SELECT ON TABLE my_table FROM `user@company.com`;
-- Show grants
SHOW GRANT ON TABLE my_table;
Chapter 11: Performance Optimization
11.1 Understanding Spark Performance
Performance optimization in Spark involves understanding and addressing:
- Data Skew: Uneven distribution of data across partitions
- Shuffle: Expensive data movement across the network
- Serialization: Cost of converting objects for transfer
- Memory Management: Efficient use of executor memory
11.2 Partitioning Strategies
# Check current number of partitions
df.rdd.getNumPartitions()
# Repartition (full shuffle)
df.repartition(100)
df.repartition(100, "customer_id") # Hash partition by column
# Coalesce (no shuffle, can only reduce partitions)
df.coalesce(10)
# Configure default shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", "200") # Default: 200
# Adaptive Query Execution (AQE) - automatically optimizes partitions
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
11.3 Broadcast Joins
When one DataFrame is small enough to fit in memory, broadcasting it avoids an expensive shuffle join.
from pyspark.sql.functions import broadcast
# Manual broadcast hint
result = large_df.join(broadcast(small_df), "customer_id")
# Configure auto-broadcast threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10mb") # Default: 10MB
# Disable auto-broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
11.4 Caching and Persistence
# Cache in memory (serialized)
df.cache()
# Persist with storage level
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.DISK_ONLY)
df.persist(StorageLevel.OFF_HEAP)
# Unpersist
df.unpersist()
# Cache a Delta table (result in Spark memory)
spark.catalog.cacheTable("my_table")
spark.catalog.uncacheTable("my_table")
spark.catalog.clearCache()
11.5 Query Optimization
Explain Plan
# View logical and physical execution plans
df.explain()
df.explain(True) # Extended plan
df.explain("cost") # Cost-based optimizer plan
df.explain("codegen") # Generated code
df.explain("formatted") # Formatted output
# Or in SQL
spark.sql("EXPLAIN SELECT * FROM orders WHERE status = 'ACTIVE'")
spark.sql("EXPLAIN EXTENDED SELECT * FROM orders WHERE status = 'ACTIVE'")
Predicate Pushdown
# Spark automatically pushes filters down to minimize data read
# Write queries so filters can be pushed down to storage
# Good: Filter happens early
df.read.parquet("/path/").filter(F.col("date") == "2024-01-01")
# Bad: Filter after expensive operations
df.read.parquet("/path/").select(expensive_udf("column")).filter(F.col("date") == "2024-01-01")
Column Pruning
# Only read columns you need
df.select("id", "amount", "date") # Better than df.select("*")
11.6 Delta Lake Performance Features
File Statistics and Data Skipping
Delta Lake maintains statistics (min, max, null count) for each file. Query predicates can use these to skip files entirely.
-- Z-ORDER colocates data for efficient skipping
OPTIMIZE orders ZORDER BY (customer_id, order_date)
-- Check data skipping effectiveness
SELECT * FROM system.storage.table_stats
WHERE table_catalog = 'production'
AND table_schema = 'sales'
AND table_name = 'orders';
Liquid Clustering
Liquid Clustering is the next generation of data layout optimization, replacing OPTIMIZE ZORDER BY.
-- Create table with liquid clustering
CREATE TABLE orders
USING DELTA
CLUSTER BY (customer_id, order_date);
-- Add clustering to existing table
ALTER TABLE orders CLUSTER BY (customer_id, order_date);
-- Run clustering
OPTIMIZE orders;
-- Check clustering information
DESCRIBE TABLE EXTENDED orders;
Auto Optimize
# Enable Auto Optimize for continuous optimization
spark.sql("""
ALTER TABLE orders SET TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
# Globally enable
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
11.7 Photon Engine
Photon is Databricks' next-generation query engine written in C++. It provides:
- Vectorized execution for much faster processing
- Accelerates SQL and DataFrame operations
- Particularly effective for aggregations, joins, and string operations
- Enabled per cluster configuration
11.8 Databricks Runtime Optimizations
# Use Databricks Runtime (DBR) for built-in optimizations
# DBR includes:
# - Optimized Spark
# - Delta Lake
# - MLflow
# - Performance improvements
# Check DBR version
import databricks.sdk.runtime as runtime
print(runtime.dbr_version)
# Use latest LTS version for stability
# or latest version for newest features
Chapter 12: Exam Tips and Practice Questions
12.1 Exam Overview
The Databricks Certified Data Engineer Associate exam tests your knowledge of:
| Domain | Weight |
|---|---|
| Databricks Lakehouse Platform | ~24% |
| ELT with Apache Spark | ~29% |
| Incremental Data Processing | ~22% |
| Production Pipelines | ~16% |
| Data Governance | ~9% |
Exam Format:
- 45 questions (multiple choice)
- 90 minutes
- Passing score: ~70%
- Available on Webassessor
12.2 Key Concepts to Master
Delta Lake Must-Knows
- ACID transactions — Delta Lake provides full ACID compliance
-
Transaction log (
_delta_log) — Every operation is recorded -
Time Travel — Query historical versions using
VERSION AS OForTIMESTAMP AS OF - VACUUM — Removes old files, default retention is 7 days
- OPTIMIZE + ZORDER — Compacts small files and colocates data
- Schema enforcement vs Schema evolution — Enforcement is default
- MERGE — Upsert operation combining INSERT and UPDATE
- CDF — Captures row-level changes for downstream processing
Spark Must-Knows
- Lazy evaluation — Transformations are lazy, actions trigger execution
- Partitions — Data is distributed across partitions
- Shuffle — Most expensive operation (GROUP BY, JOIN, DISTINCT)
- Broadcast join — Small table broadcast avoids shuffle
-
Caching — Use
.cache()for repeatedly accessed DataFrames - AQE — Adaptive Query Execution auto-optimizes plans
Streaming Must-Knows
- Output modes — append, complete, update
- Checkpointing — Required for fault tolerance
- Watermarks — Handle late-arriving data
- Auto Loader — Best practice for file ingestion
- COPY INTO — SQL alternative for incremental file loading
- Triggers — Control when streaming query runs
DLT Must-Knows
- Streaming Tables vs Materialized Views — When to use each
- Expectations — expect, expect_or_drop, expect_or_fail
- APPLY CHANGES INTO — CDC handling in DLT
- Pipeline modes — Triggered vs Continuous
- Development mode — Does not enforce expectations
-
dlt.read()vsdlt.read_stream()— Batch vs streaming read
12.3 Practice Questions
Question 1: A data engineer needs to process a large JSON dataset stored in cloud object storage incrementally. New files arrive daily. Which approach is BEST suited for this use case?
A) spark.read.json() with a daily cron schedule
B) Auto Loader with cloudFiles format and a streaming checkpoint
C) COPY INTO command in a daily job
D) spark.read.format("delta").option("readChangeFeed", "true")
Answer: B — Auto Loader with cloudFiles format is specifically designed for incremental file ingestion with exactly-once processing guarantees.
Question 2: What is stored in the _delta_log directory of a Delta table?
A) The raw Parquet data files
B) JSON and Parquet files tracking all changes to the table
C) Schema definition files only
D) A backup of deleted records
Answer: B — The _delta_log directory contains JSON transaction log files and periodic Parquet checkpoint files that record all changes.
Question 3: A Delta table has accumulated many small files over time. Which command BEST addresses this issue?
A) VACUUM table_name
B) OPTIMIZE table_name
C) ANALYZE table_name
D) REFRESH table_name
Answer: B — OPTIMIZE compacts small files into larger ones. VACUUM removes old unreferenced files but doesn't compact.
Question 4: You run VACUUM sales RETAIN 0 HOURS. What is the consequence?
A) All data is permanently deleted from the table
B) You can no longer query historical versions of the data using time travel
C) All metadata is removed but data remains accessible
D) Future MERGE operations will fail
Answer: B — VACUUM removes the files needed for time travel. You can no longer access historical versions that have been vacuumed.
Question 5: In a DLT pipeline, which expectation type will FAIL the entire pipeline if violated?
A) @dlt.expect()
B) @dlt.expect_or_drop()
C) @dlt.expect_or_fail()
D) @dlt.expect_all()
Answer: C — @dlt.expect_or_fail() fails the pipeline if any row violates the constraint.
Question 6: Which statement about Spark's lazy evaluation is TRUE?
A) Both transformations and actions execute immediately
B) Transformations execute immediately but actions are lazy
C) Transformations are lazy and execute only when an action is called
D) All operations are eagerly evaluated in Databricks
Answer: C — Transformations (like filter, select, groupBy) are lazy and only execute when an action (like collect, show, count) is called.
Question 7: A streaming query reads from a Kafka topic and aggregates events by 5-minute windows. Data can arrive up to 10 minutes late. Which feature handles the late data?
A) Checkpoint
B) Trigger interval
C) Watermark
D) Output mode
Answer: C — Watermarks specify how late data can arrive and are used to drop late data gracefully.
Question 8: What is the difference between MERGE and INSERT OVERWRITE in Delta Lake?
A) MERGE is only for streaming; INSERT OVERWRITE is for batch
B) MERGE can update, insert, and delete; INSERT OVERWRITE replaces matching partition data
C) INSERT OVERWRITE is ACID compliant; MERGE is not
D) There is no difference; they are aliases
Answer: B — MERGE is a sophisticated upsert that can handle updates, inserts, and deletes based on conditions. INSERT OVERWRITE replaces data in matching partitions.
Question 9: In the Medallion Architecture, which layer contains business-level aggregations optimized for reporting?
A) Bronze
B) Silver
C) Gold
D) Platinum
Answer: C — The Gold layer contains business-level aggregations ready for consumption by BI tools and analysts.
Question 10: Which command shows the transaction history of a Delta table?
A) SHOW HISTORY table_name
B) DESCRIBE HISTORY table_name
C) SELECT * FROM table_name VERSION AS OF 0
D) SHOW TRANSACTIONS table_name
Answer: B — DESCRIBE HISTORY table_name shows the full transaction history including operation type, timestamp, user, and version.
Question 11: A streaming query uses .trigger(once=True). What does this do?
A) Processes data once per second
B) Processes all available data in one micro-batch, then stops
C) Runs continuously with a 1-second interval
D) Processes only the first record from the source
Answer: B — trigger(once=True) processes all available data in a single batch and terminates, combining the benefits of batch and streaming.
Question 12: Which Unity Catalog privilege allows a user to read files from a Volume?
A) SELECT
B) READ FILES
C) USE VOLUME
D) USAGE
Answer: B — READ FILES is the specific privilege for reading files from Unity Catalog Volumes.
Question 13: What is the default behavior when writing a DataFrame to a Delta table without specifying a write mode?
A) Append to existing data
B) Overwrite existing data
C) Throw an error if data already exists
D) Merge with existing data
Answer: C — The default write mode is error (or errorifexists), which throws an error if the target already contains data.
Question 14: A data engineer wants to capture row-level changes to a Delta table for downstream CDC processing. Which feature should they enable?
A) Delta Time Travel
B) Change Data Feed (CDF)
C) Auto Optimize
D) Incremental VACUUM
Answer: B — Change Data Feed (CDF) captures row-level changes (insert, update_preimage, update_postimage, delete) for downstream consumption.
Question 15: In Structured Streaming, which output mode should be used for writing aggregated results (no watermark)?
A) append
B) update
C) complete
D) overwrite
Answer: C — For aggregations without watermarks, you must use complete mode, which writes the entire result table each trigger.
12.4 Common Exam Pitfalls
VACUUM and Time Travel: Remember, VACUUM with a short retention period disables time travel to those removed versions. The default is 7 days.
Schema Evolution vs Enforcement: By default, Delta enforces schema.
mergeSchema=trueis needed for schema evolution.Streaming Output Modes: Aggregations without watermarks need
completemode; simple append streams useappendmode.OPTIMIZE vs VACUUM: OPTIMIZE compacts files; VACUUM removes unreferenced files. Different purposes!
DLT Expectations: Remember the three types and their behaviors (warn, drop, fail).
Auto Loader vs COPY INTO: Auto Loader handles billions of files with streaming; COPY INTO is simpler SQL for smaller datasets.
Managed vs External Tables: Dropping a managed table deletes both metadata AND data. Dropping an external table only removes metadata.
Checkpoint Location: Each streaming query needs its own unique checkpoint location.
broadcast() hint: Use for tables smaller than 10MB to avoid shuffle joins.
Unity Catalog three-level namespace:
catalog.schema.table— don't confuse with two-leveldatabase.table.
12.5 Final Study Checklist
Before taking the exam, ensure you can:
Delta Lake:
- [ ] Explain how the transaction log works
- [ ] Perform MERGE, UPDATE, DELETE operations
- [ ] Use Time Travel with VERSION AS OF and TIMESTAMP AS OF
- [ ] Run OPTIMIZE with ZORDER
- [ ] Configure and run VACUUM
- [ ] Enable and use Change Data Feed
- [ ] Apply schema evolution with mergeSchema
Spark:
- [ ] Create and manipulate DataFrames
- [ ] Write complex SQL with CTEs, window functions
- [ ] Handle complex types (arrays, maps, structs)
- [ ] Optimize joins using broadcast
- [ ] Configure partitioning for performance
- [ ] Read from and write to various file formats
Streaming:
- [ ] Set up Auto Loader for file ingestion
- [ ] Configure output modes correctly
- [ ] Implement checkpointing
- [ ] Apply watermarks for late data
- [ ] Use COPY INTO for incremental loading
DLT:
- [ ] Create streaming tables and materialized views
- [ ] Apply expectations for data quality
- [ ] Implement CDC with APPLY CHANGES INTO
- [ ] Configure pipeline settings
Workflows:
- [ ] Create multi-task jobs with dependencies
- [ ] Configure job clusters vs all-purpose clusters
- [ ] Use dbutils for file operations and secrets
Unity Catalog:
- [ ] Navigate the three-level namespace
- [ ] Grant and revoke privileges at different levels
- [ ] Create and use Volumes
- [ ] Understand data lineage
Conclusion
The Databricks Data Engineer Associate exam is comprehensive, covering everything from low-level Spark mechanics to high-level platform features. The key to success is:
- Hands-on practice: Build real pipelines using the Databricks Community Edition or a trial account
- Understand the WHY: Don't just memorize commands — understand when and why to use each feature
- Focus on Delta Lake: A large portion of the exam centers on Delta Lake features
- Practice with DLT: Delta Live Tables is increasingly important
- Review the official documentation: Databricks documentation is excellent and aligns closely with exam content
The Lakehouse architecture represents the future of data engineering, combining the best of data lakes and data warehouses. Mastering these concepts not only prepares you for the exam but equips you with skills that are highly valuable in modern data engineering roles.
Good luck with your exam! 🚀
This guide was written based on the Databricks Certified Data Engineer Associate exam guide and covers all major topics. Always refer to the official Databricks documentation for the most up-to-date information, as the platform evolves rapidly.
Top comments (0)