DEV Community

Cover image for The Ultimate Guide to Databricks Data Engineer Associate Exam: Everything You Need to Know
Data Tech Bridge
Data Tech Bridge

Posted on

The Ultimate Guide to Databricks Data Engineer Associate Exam: Everything You Need to Know

Table of Contents

  1. Introduction to Databricks and the Lakehouse Platform
  2. Apache Spark Fundamentals
  3. Delta Lake Deep Dive
  4. ELT with Apache Spark and Delta Lake
  5. Incremental Data Processing
  6. Production Pipelines with Delta Live Tables (DLT)
  7. Databricks Workflows and Job Orchestration
  8. Data Governance with Unity Catalog
  9. Databricks SQL
  10. Security and Access Control
  11. Performance Optimization
  12. Exam Tips and Practice Questions

Chapter 1: Introduction to Databricks and the Lakehouse Platform

1.1 What is Databricks?

Databricks is a unified analytics platform built on top of Apache Spark that combines data engineering, data science, machine learning, and business intelligence into one cohesive ecosystem. Founded in 2013 by the original creators of Apache Spark, Databricks has grown into an industry-leading platform that runs on all major cloud providers — AWS, Azure, and Google Cloud Platform.

At its core, Databricks provides:

  • Collaborative notebooks for data exploration and development
  • Managed Apache Spark clusters for distributed computing
  • Delta Lake as the underlying storage layer
  • MLflow for machine learning lifecycle management
  • Databricks SQL for business intelligence and analytics
  • Delta Live Tables for declarative ETL pipelines
  • Unity Catalog for unified data governance

1.2 The Data Lakehouse Architecture

Before Databricks introduced the Lakehouse concept, organizations were forced to choose between two suboptimal architectures:

Traditional Data Warehouse

  • Structured data only
  • High cost for storage
  • Limited scalability
  • Excellent ACID compliance and performance
  • Works great for SQL workloads but struggles with unstructured data

Traditional Data Lake

  • Supports all data types (structured, semi-structured, unstructured)
  • Low-cost storage (typically object storage like S3, ADLS, GCS)
  • Highly scalable
  • Poor ACID compliance
  • No support for transactions
  • Data quality issues
  • Performance challenges

The Lakehouse

The Lakehouse paradigm brings the best of both worlds by combining:

  • Low-cost cloud storage from data lakes
  • ACID transactions from data warehouses
  • Schema enforcement and schema evolution
  • Support for BI tools directly on the lake
  • Streaming and batch processing in one platform
  • Openness — data stored in open formats like Parquet and JSON

The Lakehouse is implemented through Delta Lake, which sits on top of cloud object storage and provides the reliability and performance features typically found only in data warehouses.

1.3 Databricks Architecture Components

Control Plane

The control plane is managed by Databricks and includes:

  • Databricks web application (UI)
  • Cluster manager — manages the lifecycle of compute clusters
  • Job scheduler — orchestrates workflow execution
  • Notebook server — manages collaborative development
  • Metadata store — stores table definitions, ACLs, and configuration

Data Plane

The data plane runs within the customer's cloud account and includes:

  • Cluster compute resources (EC2 on AWS, VMs on Azure/GCP)
  • Cloud object storage (S3, ADLS Gen2, GCS)
  • Network resources (VPC, subnets, security groups)

This separation ensures that your data never leaves your cloud account, providing security and compliance guarantees.

1.4 Databricks Workspace

The Databricks Workspace is the collaborative environment where users interact with the platform. Key components include:

Notebooks
Notebooks are the primary development interface in Databricks. They support multiple languages:

  • Python (%python)
  • Scala (%scala)
  • SQL (%sql)
  • R (%r)
  • Markdown (%md)
  • Shell commands (%sh)
  • File system operations (%fs)

You can switch languages within a single notebook using magic commands, making it incredibly flexible.

Repos
Databricks Repos provides Git integration, allowing teams to:

  • Connect to GitHub, GitLab, Azure DevOps, or Bitbucket
  • Version control notebooks and code
  • Implement CI/CD pipelines
  • Collaborate across teams

Clusters
Clusters are the compute resources that execute your code. There are two primary types:

  1. All-Purpose Clusters: Used for interactive development and collaboration. They can be shared across multiple users and notebooks. These are more expensive but provide maximum flexibility.

  2. Job Clusters: Ephemeral clusters created specifically for a job and terminated when the job completes. These are more cost-effective for production workloads.

Cluster Modes:

  • Standard Mode: Single-user or shared cluster with full Spark capabilities
  • High Concurrency Mode: Optimized for concurrent usage with fine-grained resource sharing
  • Single Node Mode: Driver-only cluster for lightweight workloads and local Spark

Cluster Configuration Options:

  • Databricks Runtime (DBR) version
  • Node types and sizes
  • Autoscaling configuration
  • Auto-termination settings
  • Spot/Preemptible instance usage
  • Custom libraries and init scripts
  • Environment variables

Chapter 2: Apache Spark Fundamentals

2.1 Understanding Apache Spark

Apache Spark is the distributed computing engine that powers Databricks. Understanding Spark's core concepts is fundamental to the Data Engineer Associate exam.

Spark Architecture

Driver Node
The driver is the brain of a Spark application. It:

  • Runs the main() function
  • Creates the SparkContext/SparkSession
  • Converts user code into execution tasks
  • Schedules jobs and stages
  • Communicates with the cluster manager
  • Keeps track of metadata about distributed data

Executor Nodes
Executors are the worker processes that run on cluster nodes. They:

  • Execute the tasks assigned by the driver
  • Store data in memory or disk (RDD/DataFrame partitions)
  • Return results back to the driver
  • Each executor has multiple cores, each of which can run one task at a time

Cluster Manager
In Databricks, the cluster manager is handled automatically. Spark supports:

  • Standalone
  • YARN (Hadoop)
  • Mesos
  • Kubernetes
  • Databricks (proprietary)

Spark Execution Model

When you submit a Spark job, it goes through the following stages:

  1. Job: Triggered by an action (collect(), show(), write(), etc.)
  2. Stage: A set of tasks that can be executed in parallel without data shuffling. Stage boundaries occur at shuffle operations.
  3. Task: The smallest unit of work, operating on a single partition of data
Application
└── Job (triggered by action)
    └── Stage (divided by shuffles)
        └── Task (one per partition)
Enter fullscreen mode Exit fullscreen mode

2.2 SparkSession

The SparkSession is the entry point for all Spark functionality in modern Spark (2.0+). In Databricks, it's automatically available as spark.

# In Databricks, SparkSession is pre-created as 'spark'
# But you can access it like this:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("MyApplication") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

# Check Spark version
print(spark.version)

# Access SparkContext
sc = spark.sparkContext
print(sc.appName)
Enter fullscreen mode Exit fullscreen mode

2.3 DataFrames and the DataFrame API

DataFrames are the primary data abstraction in modern Spark. A DataFrame is a distributed collection of data organized into named columns, conceptually similar to a table in a relational database.

Creating DataFrames

# From a Python list
data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]
columns = ["name", "age"]
df = spark.createDataFrame(data, columns)

# From a CSV file
df_csv = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/path/to/file.csv")

# From a JSON file
df_json = spark.read.json("/path/to/file.json")

# From a Parquet file
df_parquet = spark.read.parquet("/path/to/file.parquet")

# From a Delta table
df_delta = spark.read.format("delta").load("/path/to/delta/table")

# Or using SQL
df_sql = spark.read.table("database.tablename")

# From a JDBC source
df_jdbc = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host:5432/database") \
    .option("dbtable", "schema.tablename") \
    .option("user", "username") \
    .option("password", "password") \
    .load()
Enter fullscreen mode Exit fullscreen mode

DataFrame Transformations

Transformations are lazy — they define a computation but don't execute it until an action is called.

from pyspark.sql import functions as F
from pyspark.sql.types import *

# Select specific columns
df.select("name", "age")
df.select(F.col("name"), F.col("age"))

# Filter/Where
df.filter(F.col("age") > 25)
df.where("age > 25")

# Add new columns
df.withColumn("birth_year", 2024 - F.col("age"))

# Rename columns
df.withColumnRenamed("name", "full_name")

# Drop columns
df.drop("unnecessary_column")

# Sort/OrderBy
df.sort("age")
df.orderBy(F.col("age").desc())

# Distinct values
df.distinct()
df.dropDuplicates(["name"])

# Limit rows
df.limit(100)

# Group By and Aggregations
df.groupBy("department") \
  .agg(
      F.count("*").alias("employee_count"),
      F.avg("salary").alias("avg_salary"),
      F.max("salary").alias("max_salary"),
      F.min("salary").alias("min_salary"),
      F.sum("salary").alias("total_salary")
  )

# Joins
df1.join(df2, df1["id"] == df2["id"], "inner")
df1.join(df2, df1["id"] == df2["id"], "left")
df1.join(df2, df1["id"] == df2["id"], "right")
df1.join(df2, df1["id"] == df2["id"], "full")
df1.join(df2, df1["id"] == df2["id"], "left_semi")
df1.join(df2, df1["id"] == df2["id"], "left_anti")
df1.join(df2, df1["id"] == df2["id"], "cross")

# Union
df1.union(df2)           # Combines by position
df1.unionByName(df2)     # Combines by column name
Enter fullscreen mode Exit fullscreen mode

DataFrame Actions

Actions trigger computation and return results.

# Collect all rows to driver (careful with large datasets!)
rows = df.collect()

# Show first N rows
df.show(20)
df.show(20, truncate=False)

# Count rows
count = df.count()

# Get first row
first_row = df.first()
first_5 = df.take(5)

# Describe statistics
df.describe().show()
df.summary().show()

# Write to storage
df.write.parquet("/path/to/output")
df.write.format("delta").save("/path/to/delta/table")
df.write.saveAsTable("database.tablename")
Enter fullscreen mode Exit fullscreen mode

2.4 Spark SQL

Spark SQL allows you to run SQL queries against DataFrames and Hive tables.

# Register DataFrame as temporary view
df.createOrReplaceTempView("employees")

# Run SQL query
result = spark.sql("""
    SELECT department, 
           COUNT(*) as emp_count,
           AVG(salary) as avg_salary
    FROM employees
    WHERE salary > 50000
    GROUP BY department
    ORDER BY avg_salary DESC
""")

# Create global temporary view (accessible across sessions)
df.createOrReplaceGlobalTempView("global_employees")
spark.sql("SELECT * FROM global_temp.global_employees")
Enter fullscreen mode Exit fullscreen mode

2.5 Data Types and Schema

Understanding Spark data types is critical for data engineering.

from pyspark.sql.types import *

# Define explicit schema
schema = StructType([
    StructField("id", IntegerType(), nullable=False),
    StructField("name", StringType(), nullable=True),
    StructField("salary", DoubleType(), nullable=True),
    StructField("hire_date", DateType(), nullable=True),
    StructField("is_active", BooleanType(), nullable=True),
    StructField("address", StructType([
        StructField("street", StringType(), nullable=True),
        StructField("city", StringType(), nullable=True),
        StructField("zip", StringType(), nullable=True)
    ]), nullable=True),
    StructField("skills", ArrayType(StringType()), nullable=True),
    StructField("metadata", MapType(StringType(), StringType()), nullable=True)
])

# Create DataFrame with explicit schema
df = spark.createDataFrame(data, schema)

# Check schema
df.printSchema()
df.schema

# Cast column types
df.withColumn("salary", F.col("salary").cast(LongType()))
df.withColumn("hire_date", F.col("hire_date").cast("date"))
Enter fullscreen mode Exit fullscreen mode

2.6 Working with Complex Types

# Working with Arrays
df = spark.createDataFrame([
    (1, ["Python", "SQL", "Spark"]),
    (2, ["Java", "Scala"])
], ["id", "skills"])

# Explode array into rows
df.select("id", F.explode("skills").alias("skill"))

# Array functions
df.select(
    "id",
    F.size("skills").alias("skill_count"),
    F.array_contains("skills", "Python").alias("knows_python"),
    F.array_distinct("skills"),
    F.sort_array("skills")
)

# Working with Structs
df = spark.createDataFrame([
    (1, ("John", "Doe", 30)),
], ["id", "person"])

df.select("id", "person.first_name", "person.last_name")

# Working with Maps
df = spark.createDataFrame([
    (1, {"key1": "value1", "key2": "value2"})
], ["id", "metadata"])

df.select(
    "id",
    F.map_keys("metadata").alias("keys"),
    F.map_values("metadata").alias("values"),
    df["metadata"]["key1"].alias("key1_value")
)
Enter fullscreen mode Exit fullscreen mode

2.7 Window Functions

Window functions are essential for analytical computations.

from pyspark.sql.window import Window

# Define window specification
window_spec = Window.partitionBy("department").orderBy("salary")

# Ranking functions
df.withColumn("rank", F.rank().over(window_spec))
df.withColumn("dense_rank", F.dense_rank().over(window_spec))
df.withColumn("row_number", F.row_number().over(window_spec))
df.withColumn("percent_rank", F.percent_rank().over(window_spec))
df.withColumn("ntile", F.ntile(4).over(window_spec))

# Analytic functions
df.withColumn("lag_salary", F.lag("salary", 1).over(window_spec))
df.withColumn("lead_salary", F.lead("salary", 1).over(window_spec))

# Aggregate functions with window
window_agg = Window.partitionBy("department")
df.withColumn("dept_avg_salary", F.avg("salary").over(window_agg))
df.withColumn("dept_total_salary", F.sum("salary").over(window_agg))

# Running totals
window_running = Window.partitionBy("department") \
    .orderBy("hire_date") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)
df.withColumn("running_salary_total", F.sum("salary").over(window_running))
Enter fullscreen mode Exit fullscreen mode

2.8 User-Defined Functions (UDFs)

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType

# Python UDF (not optimized by Catalyst)
def categorize_salary(salary):
    if salary < 50000:
        return "Low"
    elif salary < 100000:
        return "Medium"
    else:
        return "High"

# Register UDF
categorize_udf = udf(categorize_salary, StringType())

# Use UDF
df.withColumn("salary_category", categorize_udf(F.col("salary")))

# Lambda UDF
double_salary = udf(lambda x: x * 2, IntegerType())

# Pandas UDF (vectorized - much faster!)
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf(StringType())
def categorize_salary_pandas(salary: pd.Series) -> pd.Series:
    return salary.apply(lambda x: "Low" if x < 50000 else "Medium" if x < 100000 else "High")

df.withColumn("salary_category", categorize_salary_pandas(F.col("salary")))
Enter fullscreen mode Exit fullscreen mode

Chapter 3: Delta Lake Deep Dive

3.1 What is Delta Lake?

Delta Lake is an open-source storage layer that brings ACID transactions, scalable metadata handling, and unified streaming/batch data processing to data lakes. It was created by Databricks and donated to the Linux Foundation.

Delta Lake stores data as Parquet files along with a transaction log (Delta Log) that tracks all changes to the table.

3.2 Delta Lake Architecture

Delta Log (Transaction Log)

The Delta Log is a directory called _delta_log located at the root of the Delta table. It contains:

  • JSON log files: Each transaction creates a new JSON file (00000000000000000000.json, 00000000000000000001.json, etc.)
  • Checkpoint files: Every 10 transactions, Databricks creates a Parquet checkpoint file that consolidates the log for faster reads

Each log entry contains:

  • Add actions: New files added to the table
  • Remove actions: Files removed from the table
  • Metadata actions: Schema changes, table properties
  • Protocol actions: Delta protocol version upgrades
  • CommitInfo actions: Information about the operation

How Delta Lake Achieves ACID

Atomicity: Either all the changes in a transaction are committed or none are. This is achieved through the atomic JSON write to the transaction log.

Consistency: Schema enforcement ensures that writes conform to the table schema. Constraint checking ensures data integrity rules.

Isolation: Optimistic concurrency control with snapshot isolation. Each transaction reads from a consistent snapshot and conflicts are detected at commit time.

Durability: Once a transaction is committed to the log, it's permanent. The underlying cloud storage provides durability guarantees.

3.3 Creating Delta Tables

# Method 1: Write DataFrame as Delta
df.write.format("delta").save("/path/to/delta/table")

# Method 2: Write with partitioning
df.write \
  .format("delta") \
  .partitionBy("year", "month") \
  .save("/path/to/delta/table")

# Method 3: Save as table (creates metadata in metastore)
df.write.format("delta").saveAsTable("database.table_name")

# Method 4: Create table using SQL
spark.sql("""
    CREATE TABLE IF NOT EXISTS sales (
        id BIGINT,
        customer_id BIGINT,
        product_id BIGINT,
        amount DOUBLE,
        sale_date DATE
    )
    USING DELTA
    PARTITIONED BY (sale_date)
    LOCATION '/path/to/delta/sales'
    COMMENT 'Sales transactions table'
    TBLPROPERTIES (
        'delta.autoOptimize.optimizeWrite' = 'true',
        'delta.autoOptimize.autoCompact' = 'true'
    )
""")

# Method 5: CREATE TABLE AS SELECT (CTAS)
spark.sql("""
    CREATE TABLE new_sales
    USING DELTA
    AS SELECT * FROM old_sales WHERE year = 2024
""")
Enter fullscreen mode Exit fullscreen mode

3.4 Reading Delta Tables

# Read Delta table from path
df = spark.read.format("delta").load("/path/to/delta/table")

# Read Delta table from catalog
df = spark.read.table("database.table_name")

# Read a specific version (Time Travel)
df_v2 = spark.read.format("delta") \
    .option("versionAsOf", 2) \
    .load("/path/to/delta/table")

# Read at a specific timestamp
df_ts = spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-01") \
    .load("/path/to/delta/table")

# Using SQL for time travel
spark.sql("""
    SELECT * FROM sales VERSION AS OF 3
""")

spark.sql("""
    SELECT * FROM sales TIMESTAMP AS OF '2024-01-01 00:00:00'
""")
Enter fullscreen mode Exit fullscreen mode

3.5 Updating Delta Tables

INSERT Operations

# Append data (default mode)
new_data.write.mode("append").format("delta").save("/path/to/delta/table")

# Overwrite entire table
new_data.write.mode("overwrite").format("delta").save("/path/to/delta/table")

# Insert using SQL
spark.sql("""
    INSERT INTO sales VALUES (1, 100, 200, 99.99, '2024-01-15')
""")

spark.sql("""
    INSERT INTO sales SELECT * FROM staging_sales
""")

# Insert Overwrite (replaces data matching partition)
spark.sql("""
    INSERT OVERWRITE sales
    SELECT * FROM staging_sales WHERE sale_date = '2024-01-15'
""")
Enter fullscreen mode Exit fullscreen mode

UPDATE Operations

from delta.tables import DeltaTable

# Load the Delta table
delta_table = DeltaTable.forPath(spark, "/path/to/delta/table")

# Update with condition
delta_table.update(
    condition = F.col("customer_id") == 100,
    set = {"amount": F.col("amount") * 1.1}
)

# Update using SQL
spark.sql("""
    UPDATE sales
    SET amount = amount * 1.1
    WHERE customer_id = 100
""")
Enter fullscreen mode Exit fullscreen mode

DELETE Operations

# Delete with condition
delta_table.delete(condition = F.col("sale_date") < "2020-01-01")

# Delete using SQL
spark.sql("""
    DELETE FROM sales
    WHERE sale_date < '2020-01-01'
""")
Enter fullscreen mode Exit fullscreen mode

3.6 MERGE (UPSERT) Operations

MERGE is one of the most powerful features of Delta Lake, enabling complex upsert operations.

from delta.tables import DeltaTable

# Target table
target = DeltaTable.forPath(spark, "/path/to/delta/table")

# Source DataFrame
source = spark.read.table("staging_updates")

# Perform MERGE
target.alias("target") \
    .merge(
        source.alias("source"),
        "target.id = source.id"
    ) \
    .whenMatchedUpdate(set={
        "amount": F.col("source.amount"),
        "updated_at": F.current_timestamp()
    }) \
    .whenNotMatchedInsert(values={
        "id": F.col("source.id"),
        "customer_id": F.col("source.customer_id"),
        "amount": F.col("source.amount"),
        "sale_date": F.col("source.sale_date"),
        "created_at": F.current_timestamp()
    }) \
    .whenNotMatchedBySourceDelete() \
    .execute()
Enter fullscreen mode Exit fullscreen mode
-- MERGE using SQL
MERGE INTO sales AS target
USING staging_updates AS source
ON target.id = source.id
WHEN MATCHED AND source.action = 'UPDATE' THEN
    UPDATE SET target.amount = source.amount,
               target.updated_at = current_timestamp()
WHEN MATCHED AND source.action = 'DELETE' THEN
    DELETE
WHEN NOT MATCHED THEN
    INSERT (id, customer_id, amount, sale_date)
    VALUES (source.id, source.customer_id, source.amount, source.sale_date)
Enter fullscreen mode Exit fullscreen mode

3.7 Delta Lake Schema Management

Schema Enforcement

Delta Lake enforces schema by default — if you try to write data with an incompatible schema, it will throw an error.

# This will FAIL if 'new_column' doesn't exist in the schema
new_data_with_extra_column.write \
    .mode("append") \
    .format("delta") \
    .save("/path/to/delta/table")

# Error: A schema mismatch detected when writing to the Delta table
Enter fullscreen mode Exit fullscreen mode

Schema Evolution

You can enable schema evolution to automatically add new columns.

# Enable schema evolution (mergeSchema)
new_data_with_extra_column.write \
    .mode("append") \
    .option("mergeSchema", "true") \
    .format("delta") \
    .save("/path/to/delta/table")

# Overwrite and update schema
new_data.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .format("delta") \
    .save("/path/to/delta/table")
Enter fullscreen mode Exit fullscreen mode

ALTER TABLE Operations

# Add column
spark.sql("ALTER TABLE sales ADD COLUMN discount DOUBLE")

# Rename column (requires column mapping)
spark.sql("ALTER TABLE sales RENAME COLUMN amount TO total_amount")

# Drop column (requires column mapping)
spark.sql("ALTER TABLE sales DROP COLUMN old_column")

# Change column type
spark.sql("ALTER TABLE sales ALTER COLUMN amount TYPE DECIMAL(10,2)")

# Add table comment
spark.sql("COMMENT ON TABLE sales IS 'Main sales transactions'")

# Add column comment
spark.sql("ALTER TABLE sales ALTER COLUMN amount COMMENT 'Transaction amount in USD'")
Enter fullscreen mode Exit fullscreen mode

3.8 Delta Lake Time Travel

Time Travel is one of Delta Lake's most valuable features, allowing you to query historical versions of your data.

# Query version 0 (initial version)
spark.read.format("delta") \
    .option("versionAsOf", 0) \
    .load("/path/to/delta/table")

# Query at timestamp
spark.read.format("delta") \
    .option("timestampAsOf", "2024-01-01T00:00:00.000Z") \
    .load("/path/to/delta/table")

# View table history
delta_table = DeltaTable.forPath(spark, "/path/to/delta/table")
delta_table.history().show()
delta_table.history(10).show()  # Last 10 operations

# SQL equivalent
spark.sql("DESCRIBE HISTORY sales").show()
spark.sql("DESCRIBE HISTORY sales LIMIT 5").show()
Enter fullscreen mode Exit fullscreen mode

Use Cases for Time Travel:

  • Auditing: Track changes to data over time
  • Rollback: Restore data to a previous state
  • Debugging: Reproduce issues by querying historical data
  • Regulatory compliance: Access point-in-time data for compliance

3.9 OPTIMIZE and VACUUM

OPTIMIZE

Over time, Delta tables accumulate many small files due to frequent small writes. OPTIMIZE compacts these small files into larger, more efficient files.

# Optimize entire table
spark.sql("OPTIMIZE sales")

# Optimize with Z-ORDER (co-locate related data)
spark.sql("OPTIMIZE sales ZORDER BY (customer_id, sale_date)")

# Optimize specific partition
spark.sql("OPTIMIZE sales WHERE sale_date = '2024-01-15'")
Enter fullscreen mode Exit fullscreen mode

Z-Ordering is a technique that co-locates related information in the same set of files. This improves query performance by reducing the amount of data that needs to be read. It's particularly effective for high-cardinality columns that are frequently used in filters.

VACUUM

VACUUM removes files that are no longer referenced by the Delta log and are older than the retention period.

# Vacuum with default retention (7 days)
spark.sql("VACUUM sales")

# Vacuum with custom retention (in hours)
spark.sql("VACUUM sales RETAIN 168 HOURS")  # 7 days

# DRY RUN - shows files to be deleted without actually deleting
spark.sql("VACUUM sales DRY RUN")

# WARNING: This disables time travel beyond the retention period
# To vacuum with less than 7 days (use carefully!):
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
spark.sql("VACUUM sales RETAIN 0 HOURS")
spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "true")
Enter fullscreen mode Exit fullscreen mode

Important: You cannot time travel to versions that have been vacuumed. Default retention is 7 days (168 hours).

3.10 Delta Table Properties and Features

# View table properties
spark.sql("DESCRIBE EXTENDED sales")
spark.sql("SHOW TBLPROPERTIES sales")

# Set table properties
spark.sql("""
    ALTER TABLE sales 
    SET TBLPROPERTIES (
        'delta.logRetentionDuration' = 'interval 30 days',
        'delta.deletedFileRetentionDuration' = 'interval 7 days',
        'delta.autoOptimize.optimizeWrite' = 'true',
        'delta.autoOptimize.autoCompact' = 'true'
    )
""")

# Enable column mapping (required for renaming/dropping columns)
spark.sql("""
    ALTER TABLE sales 
    SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name')
""")

# Enable Change Data Feed
spark.sql("""
    ALTER TABLE sales 
    SET TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
""")
Enter fullscreen mode Exit fullscreen mode

3.11 Change Data Feed (CDF)

Change Data Feed captures row-level changes (inserts, updates, deletes) made to a Delta table.

# Enable CDF on table creation
spark.sql("""
    CREATE TABLE sales (
        id BIGINT,
        amount DOUBLE,
        updated_at TIMESTAMP
    )
    USING DELTA
    TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
""")

# Read changes from a specific version
changes = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingVersion", 1) \
    .table("sales")

# Read changes from a specific timestamp
changes = spark.read.format("delta") \
    .option("readChangeFeed", "true") \
    .option("startingTimestamp", "2024-01-01") \
    .table("sales")

# CDF adds these special columns:
# _change_type: 'insert', 'update_preimage', 'update_postimage', 'delete'
# _commit_version: The version of the commit
# _commit_timestamp: The timestamp of the commit

changes.filter("_change_type = 'insert'").show()
changes.filter("_change_type IN ('update_preimage', 'update_postimage')").show()
Enter fullscreen mode Exit fullscreen mode

3.12 Constraints

Delta Lake supports column-level and table-level constraints.

# NOT NULL constraint
spark.sql("""
    ALTER TABLE sales 
    ALTER COLUMN id SET NOT NULL
""")

# CHECK constraint
spark.sql("""
    ALTER TABLE sales 
    ADD CONSTRAINT valid_amount CHECK (amount > 0)
""")

# View constraints
spark.sql("DESCRIBE EXTENDED sales")

# Drop constraint
spark.sql("ALTER TABLE sales DROP CONSTRAINT valid_amount")
Enter fullscreen mode Exit fullscreen mode

Chapter 4: ELT with Apache Spark and Delta Lake

4.1 ELT vs ETL

Traditional ETL (Extract, Transform, Load):

  • Data is extracted from source
  • Transformed outside the target system
  • Loaded into the destination

Modern ELT (Extract, Load, Transform):

  • Data is extracted from source
  • Loaded raw into the data lake
  • Transformed within the data platform

In the Databricks Lakehouse, ELT is preferred because:

  • Raw data is preserved for reprocessing
  • Transformations leverage distributed compute power
  • Single platform reduces complexity
  • Cost optimization through separation of storage and compute

4.2 Medallion Architecture

The Medallion Architecture (Bronze, Silver, Gold) is the recommended approach for organizing data in the Lakehouse.

Source Systems → [Bronze] → [Silver] → [Gold] → Analytics/ML
Enter fullscreen mode Exit fullscreen mode

Bronze Layer (Raw)

  • Raw data ingested as-is from source systems
  • No transformations (maybe some basic typing)
  • Schema may not be enforced
  • Keeps historical record of all raw data
  • Data is append-only
# Ingesting raw JSON data into Bronze
raw_data = spark.read \
    .format("json") \
    .option("multiLine", "true") \
    .load("/raw/incoming/orders/*.json")

# Add ingestion metadata
bronze_data = raw_data.withColumn("_ingested_at", F.current_timestamp()) \
                      .withColumn("_source_file", F.input_file_name())

bronze_data.write \
    .mode("append") \
    .format("delta") \
    .save("/bronze/orders")
Enter fullscreen mode Exit fullscreen mode

Silver Layer (Cleansed)

  • Data is cleaned, validated, and standardized
  • Joins happen at this layer
  • Schema enforcement applied
  • Still at roughly the same granularity as Bronze
  • Deduplication happens here
# Read from Bronze
bronze_df = spark.read.format("delta").load("/bronze/orders")

# Clean and validate
silver_df = bronze_df \
    .filter(F.col("order_id").isNotNull()) \
    .filter(F.col("amount") > 0) \
    .dropDuplicates(["order_id"]) \
    .withColumn("order_date", F.to_date("order_date", "yyyy-MM-dd")) \
    .withColumn("amount", F.col("amount").cast("decimal(10,2)")) \
    .withColumn("status", F.upper(F.col("status"))) \
    .withColumn("_processed_at", F.current_timestamp())

# Upsert into Silver (handle late-arriving data)
from delta.tables import DeltaTable

if DeltaTable.isDeltaTable(spark, "/silver/orders"):
    silver_table = DeltaTable.forPath(spark, "/silver/orders")
    silver_table.alias("target") \
        .merge(silver_df.alias("source"), "target.order_id = source.order_id") \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
else:
    silver_df.write.format("delta").save("/silver/orders")
Enter fullscreen mode Exit fullscreen mode

Gold Layer (Business Aggregated)

  • Business-level aggregations
  • Optimized for specific use cases (reporting, ML)
  • Pre-aggregated data for performance
  • Multiple Gold tables may be derived from same Silver data
# Create Gold table: Daily Sales Summary
gold_daily_sales = spark.read.format("delta").load("/silver/orders") \
    .filter("status = 'COMPLETED'") \
    .groupBy(
        F.to_date("order_date").alias("date"),
        "product_category",
        "region"
    ) \
    .agg(
        F.count("*").alias("order_count"),
        F.sum("amount").alias("total_revenue"),
        F.avg("amount").alias("avg_order_value"),
        F.countDistinct("customer_id").alias("unique_customers")
    )

gold_daily_sales.write \
    .mode("overwrite") \
    .format("delta") \
    .partitionBy("date") \
    .save("/gold/daily_sales_summary")
Enter fullscreen mode Exit fullscreen mode

4.3 Common Data Transformations

String Operations

# String functions
df.select(
    F.upper("name"),
    F.lower("name"),
    F.trim("name"),
    F.ltrim("name"),
    F.rtrim("name"),
    F.length("name"),
    F.substring("name", 1, 3),
    F.concat("first_name", F.lit(" "), "last_name"),
    F.concat_ws(" ", "first_name", "last_name"),
    F.split("full_name", " "),
    F.regexp_replace("phone", "[^0-9]", ""),
    F.regexp_extract("email", "(@.*)", 1),
    F.like("name", "A%"),
    F.lpad("id", 10, "0"),
    F.rpad("name", 20, " ")
)
Enter fullscreen mode Exit fullscreen mode

Date and Time Operations

# Date/Time functions
df.select(
    F.current_date(),
    F.current_timestamp(),
    F.to_date("date_str", "yyyy-MM-dd"),
    F.to_timestamp("ts_str", "yyyy-MM-dd HH:mm:ss"),
    F.date_format("date_col", "MM/dd/yyyy"),
    F.year("date_col"),
    F.month("date_col"),
    F.dayofmonth("date_col"),
    F.dayofweek("date_col"),
    F.dayofyear("date_col"),
    F.hour("timestamp_col"),
    F.minute("timestamp_col"),
    F.second("timestamp_col"),
    F.date_add("date_col", 7),
    F.date_sub("date_col", 7),
    F.datediff("end_date", "start_date"),
    F.months_between("end_date", "start_date"),
    F.add_months("date_col", 3),
    F.last_day("date_col"),
    F.next_day("date_col", "Monday"),
    F.trunc("date_col", "month"),
    F.date_trunc("month", "timestamp_col"),
    F.unix_timestamp("timestamp_col"),
    F.from_unixtime("unix_ts"),
    F.from_utc_timestamp("timestamp_col", "America/New_York")
)
Enter fullscreen mode Exit fullscreen mode

Null Handling

# Null handling
df.select(
    F.isnull("column"),
    F.isnan("column"),
    F.coalesce("col1", "col2", F.lit("default")),
    F.nvl("col1", "col2"),          # Returns col1 if not null, else col2
    F.nvl2("col1", "col2", "col3"), # If col1 not null return col2, else col3
    F.nullif("col1", "col2"),       # Returns null if col1 == col2
    F.ifnull("col1", "col2"),       # Same as NVL
    F.nanvl("col1", "col2")         # Returns col1 if not NaN, else col2
)

# Fill null values
df.fillna(0, subset=["salary"])
df.fillna({"salary": 0, "name": "Unknown"})
df.na.fill(0)
df.na.drop()  # Drop rows with any null
df.na.drop(how="all")  # Drop rows where all values are null
df.na.drop(subset=["critical_column"])
Enter fullscreen mode Exit fullscreen mode

Conditional Logic

# CASE WHEN using when/otherwise
df.withColumn("salary_tier",
    F.when(F.col("salary") < 50000, "Junior")
     .when(F.col("salary") < 100000, "Mid")
     .when(F.col("salary") < 150000, "Senior")
     .otherwise("Executive")
)

# Using expr for complex SQL expressions
df.withColumn("status",
    F.expr("CASE WHEN age < 18 THEN 'minor' WHEN age < 65 THEN 'adult' ELSE 'senior' END")
)
Enter fullscreen mode Exit fullscreen mode

4.4 Reading Data from Various Sources

# CSV
df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("delimiter", ",") \
    .option("quote", '"') \
    .option("escape", "\\") \
    .option("nullValue", "NULL") \
    .option("dateFormat", "yyyy-MM-dd") \
    .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") \
    .option("multiLine", "true") \
    .option("encoding", "UTF-8") \
    .load("/path/to/*.csv")

# JSON
df = spark.read \
    .format("json") \
    .option("multiLine", "true") \
    .option("inferSchema", "true") \
    .option("dateFormat", "yyyy-MM-dd") \
    .load("/path/to/*.json")

# Parquet
df = spark.read.parquet("/path/to/parquet/")

# Avro
df = spark.read.format("avro").load("/path/to/avro/")

# ORC
df = spark.read.orc("/path/to/orc/")

# Text files
df = spark.read.text("/path/to/text/")
df = spark.read.format("text").load("/path/to/text/*.txt")
Enter fullscreen mode Exit fullscreen mode

4.5 Writing Data to Various Formats

# Write modes
# "append" - Append to existing data
# "overwrite" - Overwrite all existing data
# "error"/"errorifexists" - Throw error if data exists (default)
# "ignore" - Silently skip write if data exists

# Write as Parquet
df.write \
    .mode("overwrite") \
    .partitionBy("year", "month") \
    .parquet("/path/to/output/")

# Write as CSV
df.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("/path/to/output.csv")

# Write as Delta
df.write \
    .format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .partitionBy("date") \
    .save("/path/to/delta/table")

# Coalesce before writing (reduce output files)
df.coalesce(1).write.mode("overwrite").csv("/path/to/single/file.csv")

# Repartition for balanced output
df.repartition(10).write.format("delta").save("/path/to/delta/")
Enter fullscreen mode Exit fullscreen mode

Chapter 5: Incremental Data Processing

5.1 What is Incremental Processing?

Incremental processing is the practice of processing only new or changed data, rather than reprocessing the entire dataset each time. This is critical for production data pipelines because:

  • Reduces compute costs
  • Minimizes processing time
  • Enables near-real-time data freshness
  • Scales to large datasets efficiently

5.2 Structured Streaming

Structured Streaming is Spark's streaming engine built on top of the DataFrame/Dataset API. It treats a streaming data source as an unbounded table that grows continuously.

Key Concepts

Micro-batch Processing: Processes data in small batches at regular intervals (default 500ms or when data is available).

Continuous Processing: For ultra-low latency (millisecond range), though less common.

Trigger Types:

  • processingTime="0 seconds" - Process as fast as possible
  • processingTime="1 minute" - Fixed interval
  • once=True - Process all available data, then stop (batch mode)
  • availableNow=True - Process all available data across micro-batches, then stop

Input Sources

# Kafka source
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "topic1,topic2") \
    .option("startingOffsets", "earliest") \
    .option("maxOffsetsPerTrigger", 10000) \
    .load()

# Parse Kafka value
from pyspark.sql.types import *
schema = StructType([
    StructField("id", IntegerType()),
    StructField("value", StringType())
])

parsed_df = kafka_df.select(
    F.from_json(F.col("value").cast("string"), schema).alias("data")
).select("data.*")

# Auto Loader (cloud file source)
autoloader_df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("cloudFiles.inferColumnTypes", "true") \
    .option("cloudFiles.schemaLocation", "/path/to/schema/") \
    .load("/path/to/landing/zone/")

# Delta table as stream source
delta_stream = spark.readStream \
    .format("delta") \
    .option("maxFilesPerTrigger", 100) \
    .table("bronze.orders")

# Rate source (for testing)
test_stream = spark.readStream \
    .format("rate") \
    .option("rowsPerSecond", 100) \
    .load()
Enter fullscreen mode Exit fullscreen mode

Output Sinks and Output Modes

Output Modes:

  • append: Only new rows added since last trigger are written (default for streaming sources)
  • complete: The entire result table is written (required for aggregations without watermark)
  • update: Only rows that were updated since last trigger are written
# Write to Delta (most common)
query = df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/path/to/checkpoint/") \
    .trigger(processingTime="1 minute") \
    .start("/path/to/output/delta/")

# Write to Delta table by name
query = df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/path/to/checkpoint/") \
    .toTable("silver.orders")

# Write to Kafka
query = df.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("topic", "output-topic") \
    .option("checkpointLocation", "/path/to/checkpoint/") \
    .start()

# Write to memory (for debugging/testing)
query = df.writeStream \
    .format("memory") \
    .queryName("temp_table") \
    .outputMode("complete") \
    .start()

spark.sql("SELECT * FROM temp_table").show()

# Console (for debugging)
query = df.writeStream \
    .format("console") \
    .outputMode("append") \
    .start()

# Manage streaming query
query.status     # Current status
query.isActive   # True if running
query.stop()     # Stop the query
query.awaitTermination()  # Block until complete
query.awaitTermination(timeout=60)  # Block with timeout
Enter fullscreen mode Exit fullscreen mode

Checkpointing

Checkpointing is critical for fault tolerance in streaming. It stores:

  • The current offset/position in the source
  • The state of aggregations
  • Metadata about the query
# Always specify a unique checkpoint location per query
query = df.writeStream \
    .option("checkpointLocation", "/checkpoint/orders_to_silver/") \
    .format("delta") \
    .start("/silver/orders/")
Enter fullscreen mode Exit fullscreen mode

5.3 Auto Loader

Auto Loader is Databricks' optimized solution for incrementally loading files from cloud storage. It's the recommended approach for Bronze layer ingestion.

# Basic Auto Loader setup
df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "json") \
    .option("cloudFiles.schemaLocation", "/schema/orders/") \
    .load("/landing/orders/")

# Write to Bronze Delta table
query = df.writeStream \
    .format("delta") \
    .option("checkpointLocation", "/checkpoint/orders_landing/") \
    .option("mergeSchema", "true") \
    .trigger(availableNow=True) \
    .toTable("bronze.orders")

query.awaitTermination()
Enter fullscreen mode Exit fullscreen mode

Auto Loader Benefits:

  • Automatic schema detection and evolution: Infers schema and evolves it as new files arrive
  • File discovery: Uses cloud notification (preferred) or directory listing
  • Exactly-once processing: Uses checkpointing to ensure no file is processed twice
  • Handles large number of files: More efficient than manual file tracking

Configuration Options:

df = spark.readStream \
    .format("cloudFiles") \
    .option("cloudFiles.format", "csv") \
    .option("cloudFiles.schemaLocation", "/schema/events/") \
    .option("cloudFiles.inferColumnTypes", "true") \
    .option("cloudFiles.backfillInterval", "1 day") \
    .option("cloudFiles.maxBytesPerTrigger", "10g") \
    .option("cloudFiles.maxFilesPerTrigger", 1000) \
    .option("cloudFiles.schemaEvolutionMode", "addNewColumns") \
    .option("header", "true") \
    .load("/landing/events/")
Enter fullscreen mode Exit fullscreen mode

5.4 Watermarking in Streaming

Watermarks handle late-arriving data in streaming aggregations.

# Define watermark (tolerate up to 10 minutes of late data)
windowed_counts = df \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        F.window("event_time", "5 minutes"),
        "user_id"
    ) \
    .count()

# Sliding window
sliding_window = df \
    .withWatermark("event_time", "1 hour") \
    .groupBy(
        F.window("event_time", "1 hour", "15 minutes"),  # 1-hour window, 15-min slide
        "product_id"
    ) \
    .agg(F.sum("amount").alias("total"))
Enter fullscreen mode Exit fullscreen mode

5.5 Incremental Processing with COPY INTO

COPY INTO is a SQL command that incrementally loads files into a Delta table. Files are only loaded once (it tracks what's already been loaded).

-- Basic COPY INTO
COPY INTO sales
FROM '/path/to/source/'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header' = 'true', 'inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')

-- COPY INTO with JSON
COPY INTO events
FROM '/landing/events/'
FILEFORMAT = JSON
FORMAT_OPTIONS ('multiLine' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true')

-- COPY INTO with explicit schema
COPY INTO sales
FROM (
    SELECT 
        cast(id as BIGINT) as id,
        cast(amount as DECIMAL(10,2)) as amount,
        to_date(sale_date, 'yyyy-MM-dd') as sale_date
    FROM read_files('/landing/sales/', format => 'csv', header => true)
)
Enter fullscreen mode Exit fullscreen mode

COPY INTO vs Auto Loader:
| Feature | COPY INTO | Auto Loader |
|---------|-----------|-------------|
| Interface | SQL | Python/Scala |
| Scalability | Millions of files | Billions of files |
| File tracking | Within Delta log | Checkpoint files |
| Schema evolution | Manual | Automatic |
| Streaming support | No | Yes |
| Use case | Small-medium datasets | Large-scale ingestion |


Chapter 6: Production Pipelines with Delta Live Tables (DLT)

6.1 Introduction to Delta Live Tables

Delta Live Tables (DLT) is a declarative framework for building reliable, maintainable, and testable data pipelines on Databricks. Instead of writing imperative code to manage pipeline execution, you declare the data transformations and DLT handles:

  • Pipeline orchestration and execution ordering
  • Automatic error handling and retries
  • Data quality enforcement
  • Monitoring and observability
  • Infrastructure management

6.2 DLT Pipeline Components

Datasets

DLT has three types of datasets:

  1. Streaming Tables: For append-only streaming sources
  2. Materialized Views: For aggregations and transformations that need to be persisted
  3. Views: For temporary transformations (not stored)

Pipeline Types

  • Triggered: Run once and stop (like a scheduled batch job)
  • Continuous: Run continuously to minimize latency

6.3 Defining DLT Pipelines

import dlt
from pyspark.sql import functions as F
from pyspark.sql.types import *

# === BRONZE LAYER ===

# Create a streaming table from Auto Loader
@dlt.table(
    name="raw_orders",
    comment="Raw orders data ingested from landing zone",
    table_properties={
        "quality": "bronze",
        "pipelines.reset.allowed": "true"
    }
)
def raw_orders():
    return (
        spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "json")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("cloudFiles.schemaLocation", 
                    "/pipelines/schemas/raw_orders/")
            .load("/landing/orders/")
    )

# === SILVER LAYER ===

# Create a streaming table with expectations (data quality)
@dlt.table(
    name="clean_orders",
    comment="Cleaned and validated orders",
    table_properties={"quality": "silver"}
)
@dlt.expect("valid_order_id", "order_id IS NOT NULL")
@dlt.expect_or_drop("positive_amount", "amount > 0")
@dlt.expect_or_fail("valid_status", "status IN ('PENDING', 'PROCESSING', 'COMPLETED', 'CANCELLED')")
def clean_orders():
    return (
        dlt.read_stream("raw_orders")
            .filter(F.col("order_id").isNotNull())
            .withColumn("order_date", F.to_date("order_date"))
            .withColumn("amount", F.col("amount").cast("decimal(10,2)"))
            .withColumn("status", F.upper("status"))
            .withColumn("_processed_at", F.current_timestamp())
    )

# === GOLD LAYER ===

# Create a Materialized View for aggregations
@dlt.table(
    name="daily_sales_summary",
    comment="Daily sales aggregated by product category",
    table_properties={"quality": "gold"}
)
def daily_sales_summary():
    return (
        dlt.read("clean_orders")
            .filter("status = 'COMPLETED'")
            .groupBy(
                F.to_date("order_date").alias("date"),
                "product_category"
            )
            .agg(
                F.count("*").alias("order_count"),
                F.sum("amount").alias("total_revenue"),
                F.avg("amount").alias("avg_order_value")
            )
    )
Enter fullscreen mode Exit fullscreen mode

6.4 DLT Expectations (Data Quality)

Expectations define data quality rules in DLT pipelines.

# @dlt.expect: Tracks violations but doesn't drop or fail
@dlt.expect("valid_email", "email LIKE '%@%.%'")
def my_table():
    ...

# @dlt.expect_or_drop: Drops rows that violate the constraint
@dlt.expect_or_drop("non_null_id", "id IS NOT NULL")
def my_table():
    ...

# @dlt.expect_or_fail: Fails the pipeline if any row violates
@dlt.expect_or_fail("critical_check", "amount >= 0")
def my_table():
    ...

# Multiple expectations
@dlt.expect("valid_order_id", "order_id IS NOT NULL")
@dlt.expect_or_drop("positive_amount", "amount > 0")
@dlt.expect_or_fail("valid_customer", "customer_id IS NOT NULL")
def my_table():
    ...

# expect_all: Apply multiple constraints at once
@dlt.expect_all({
    "valid_id": "id IS NOT NULL",
    "valid_amount": "amount > 0",
    "valid_date": "order_date >= '2020-01-01'"
})
def my_table():
    ...

# expect_all_or_drop
@dlt.expect_all_or_drop({
    "valid_id": "id IS NOT NULL",
    "valid_amount": "amount > 0"
})
def my_table():
    ...

# expect_all_or_fail
@dlt.expect_all_or_fail({
    "critical_id": "id IS NOT NULL",
    "critical_amount": "amount > 0"
})
def my_table():
    ...
Enter fullscreen mode Exit fullscreen mode

6.5 DLT with SQL

DLT also supports SQL syntax:

-- Bronze: Streaming table from Auto Loader
CREATE OR REFRESH STREAMING TABLE raw_customers
COMMENT "Raw customer data from source systems"
AS SELECT * FROM cloud_files("/landing/customers/", "json",
    map("cloudFiles.inferColumnTypes", "true"))

-- Silver: Streaming table with expectations
CREATE OR REFRESH STREAMING TABLE clean_customers (
    CONSTRAINT valid_customer_id EXPECT (customer_id IS NOT NULL) ON VIOLATION FAIL UPDATE,
    CONSTRAINT valid_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW,
    CONSTRAINT valid_age EXPECT (age BETWEEN 18 AND 120) ON VIOLATION WARN
)
COMMENT "Cleaned customer data"
AS
SELECT 
    customer_id,
    upper(trim(first_name)) as first_name,
    upper(trim(last_name)) as last_name,
    lower(email) as email,
    cast(age as INT) as age,
    to_date(signup_date, 'yyyy-MM-dd') as signup_date,
    current_timestamp() as _processed_at
FROM STREAM(LIVE.raw_customers)
WHERE customer_id IS NOT NULL

-- Gold: Materialized view
CREATE OR REFRESH MATERIALIZED VIEW customer_summary
COMMENT "Customer segment summary"
AS
SELECT 
    CASE 
        WHEN age < 25 THEN 'Gen Z'
        WHEN age < 40 THEN 'Millennial'
        WHEN age < 55 THEN 'Gen X'
        ELSE 'Boomer'
    END as age_segment,
    count(*) as customer_count,
    avg(age) as avg_age
FROM LIVE.clean_customers
GROUP BY 1
Enter fullscreen mode Exit fullscreen mode

6.6 DLT Pipeline Configuration

{
  "name": "Orders Pipeline",
  "storage": "/pipelines/orders/",
  "target": "orders_db",
  "libraries": [
    {"notebook": {"path": "/pipelines/bronze/raw_orders"}},
    {"notebook": {"path": "/pipelines/silver/clean_orders"}},
    {"notebook": {"path": "/pipelines/gold/order_summaries"}}
  ],
  "clusters": [
    {
      "label": "default",
      "autoscale": {
        "min_workers": 1,
        "max_workers": 5
      }
    }
  ],
  "development": false,
  "photon": true,
  "channel": "CURRENT",
  "edition": "ADVANCED",
  "continuous": false
}
Enter fullscreen mode Exit fullscreen mode

6.7 DLT APPLY CHANGES (CDC)

DLT supports Change Data Capture (CDC) through the APPLY CHANGES INTO syntax.

import dlt
from pyspark.sql import functions as F

# Source CDC stream
@dlt.view
def customers_cdc():
    return spark.readStream \
        .format("cloudFiles") \
        .option("cloudFiles.format", "json") \
        .load("/cdc/customers/")

# Apply changes to target table
dlt.create_streaming_table("customers")

dlt.apply_changes(
    target = "customers",
    source = "customers_cdc",
    keys = ["customer_id"],
    sequence_by = F.col("updated_at"),  # Sequence field to handle ordering
    apply_as_deletes = F.expr("operation = 'DELETE'"),
    apply_as_truncates = F.expr("operation = 'TRUNCATE'"),
    except_column_list = ["operation", "updated_at"],  # Columns to exclude
    stored_as_scd_type = 1  # SCD Type 1 (overwrite) or 2 (history)
)
Enter fullscreen mode Exit fullscreen mode

SCD Type 2 with APPLY CHANGES:

dlt.apply_changes(
    target = "customers_history",
    source = "customers_cdc",
    keys = ["customer_id"],
    sequence_by = F.col("updated_at"),
    stored_as_scd_type = 2,  # Maintain full history
    track_history_column_list = ["email", "address", "phone"]  # Only track specific columns
)
Enter fullscreen mode Exit fullscreen mode

Chapter 7: Databricks Workflows and Job Orchestration

7.1 Introduction to Databricks Workflows

Databricks Workflows (formerly Jobs) is the built-in orchestration service for scheduling and running data pipelines. It's used to:

  • Schedule notebooks, Python scripts, JARs, and DLT pipelines
  • Create complex multi-task dependencies (DAGs)
  • Monitor execution and receive alerts
  • Implement retry logic and error handling

7.2 Creating and Configuring Jobs

Single Task Jobs

# Using the Databricks SDK
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import jobs

client = WorkspaceClient()

# Create a simple notebook job
job = client.jobs.create(
    name="My Data Pipeline",
    tasks=[
        jobs.Task(
            task_key="ingest_data",
            notebook_task=jobs.NotebookTask(
                notebook_path="/pipelines/ingestion/load_bronze",
                base_parameters={"env": "prod", "date": "{{ds}}"}
            ),
            new_cluster=jobs.ClusterSpec(
                spark_version="13.3.x-scala2.12",
                node_type_id="i3.xlarge",
                num_workers=2
            ),
            timeout_seconds=3600,
            max_retries=2,
            min_retry_interval_millis=60000  # 1 minute between retries
        )
    ],
    schedule=jobs.CronSchedule(
        quartz_cron_expression="0 0 8 * * ?",  # Daily at 8 AM
        timezone_id="America/New_York",
        pause_status=jobs.PauseStatus.UNPAUSED
    ),
    email_notifications=jobs.JobEmailNotifications(
        on_failure=["data-team@company.com"],
        on_success=["data-team@company.com"]
    )
)
Enter fullscreen mode Exit fullscreen mode

Multi-Task Jobs (DAGs)

{
  "name": "ETL Pipeline",
  "tasks": [
    {
      "task_key": "ingest_bronze",
      "notebook_task": {
        "notebook_path": "/pipelines/bronze/ingest"
      },
      "new_cluster": {
        "spark_version": "13.3.x-scala2.12",
        "node_type_id": "i3.xlarge",
        "num_workers": 2
      }
    },
    {
      "task_key": "transform_silver",
      "depends_on": [{"task_key": "ingest_bronze"}],
      "notebook_task": {
        "notebook_path": "/pipelines/silver/transform"
      },
      "existing_cluster_id": "{{job_cluster_id}}"
    },
    {
      "task_key": "aggregate_gold_sales",
      "depends_on": [{"task_key": "transform_silver"}],
      "notebook_task": {
        "notebook_path": "/pipelines/gold/sales_agg"
      }
    },
    {
      "task_key": "aggregate_gold_customers",
      "depends_on": [{"task_key": "transform_silver"}],
      "notebook_task": {
        "notebook_path": "/pipelines/gold/customer_agg"
      }
    },
    {
      "task_key": "update_reporting",
      "depends_on": [
        {"task_key": "aggregate_gold_sales"},
        {"task_key": "aggregate_gold_customers"}
      ],
      "notebook_task": {
        "notebook_path": "/pipelines/reporting/refresh"
      }
    }
  ]
}
Enter fullscreen mode Exit fullscreen mode

7.3 Task Types

# Notebook task
notebook_task = jobs.NotebookTask(
    notebook_path="/path/to/notebook",
    base_parameters={"param1": "value1"}
)

# Python Script task
python_task = jobs.SparkPythonTask(
    python_file="/path/to/script.py",
    parameters=["--env", "prod"]
)

# JAR task
jar_task = jobs.SparkJarTask(
    main_class_name="com.company.MainClass",
    jar_uri="dbfs:/jars/my_app.jar",
    parameters=["arg1", "arg2"]
)

# DLT Pipeline task
dlt_task = jobs.PipelineTask(
    pipeline_id="pipeline-id-here"
)

# SQL task
sql_task = jobs.SqlTask(
    query=jobs.SqlTaskQuery(query_id="query-id"),
    warehouse_id="warehouse-id"
)

# Python Wheel task
wheel_task = jobs.PythonWheelTask(
    package_name="my_package",
    entry_point="main",
    named_parameters={"env": "prod"}
)
Enter fullscreen mode Exit fullscreen mode

7.4 Job Parameters and Dynamic Values

# In notebooks, access job parameters using dbutils
dbutils.widgets.text("env", "dev")
dbutils.widgets.text("run_date", "")

env = dbutils.widgets.get("env")
run_date = dbutils.widgets.get("run_date")

# Dynamic value syntax in job configuration
# {{run_id}} - Current run ID
# {{job_id}} - Job ID
# {{task_key}} - Task key
# {{start_time}} - Start time
# {{parent_run_id}} - Parent run ID

# Example: Pass current date
{
    "base_parameters": {
        "date": "{{start_date}}", 
        "run_id": "{{run_id}}"
    }
}
Enter fullscreen mode Exit fullscreen mode

7.5 Cluster Policies

Cluster policies allow administrators to control what users can configure when creating clusters.

{
    "name": "Standard Data Engineering Policy",
    "definition": {
        "spark_version": {
            "type": "allowlist",
            "values": ["13.3.x-scala2.12", "12.2.x-scala2.12"]
        },
        "node_type_id": {
            "type": "allowlist",
            "values": ["i3.xlarge", "i3.2xlarge", "i3.4xlarge"]
        },
        "num_workers": {
            "type": "range",
            "minValue": 1,
            "maxValue": 10,
            "defaultValue": 2
        },
        "autotermination_minutes": {
            "type": "fixed",
            "value": 60
        },
        "spark_conf.spark.databricks.delta.preview.enabled": {
            "type": "fixed",
            "value": "true"
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

7.6 dbutils — Databricks Utilities

dbutils is a powerful utility library available in Databricks notebooks.

# File System utilities
dbutils.fs.ls("/path/to/directory")         # List files
dbutils.fs.mkdirs("/path/to/new/dir")       # Create directory
dbutils.fs.cp("/source/file", "/dest/file") # Copy file
dbutils.fs.mv("/source/file", "/dest/file") # Move file
dbutils.fs.rm("/path/to/file")              # Remove file
dbutils.fs.rm("/path/to/dir", recurse=True) # Remove directory
dbutils.fs.head("/path/to/file", 1024)      # Read first N bytes
dbutils.fs.put("/path/to/file", "content")  # Write content to file

# Widgets
dbutils.widgets.text("parameter", "default_value", "Label")
dbutils.widgets.dropdown("env", "dev", ["dev", "test", "prod"])
dbutils.widgets.combobox("region", "us-east", ["us-east", "eu-west"])
dbutils.widgets.multiselect("tables", "orders", ["orders", "customers", "products"])
dbutils.widgets.get("parameter")   # Get widget value
dbutils.widgets.remove("parameter")  # Remove widget
dbutils.widgets.removeAll()  # Remove all widgets

# Secrets
dbutils.secrets.list("scope-name")  # List secrets in scope
dbutils.secrets.get("scope-name", "secret-key")  # Get secret value

# Notebook utilities
result = dbutils.notebook.run("/path/to/notebook", 60, {"param": "value"})
dbutils.notebook.exit("success")  # Exit with message

# Library utilities
dbutils.library.installPyPI("pandas", version="1.5.0")
dbutils.library.restartPython()
Enter fullscreen mode Exit fullscreen mode

Chapter 8: Data Governance with Unity Catalog

8.1 Introduction to Unity Catalog

Unity Catalog is Databricks' unified governance solution that provides centralized access control, auditing, lineage, and data discovery across all workspaces in an account.

Key capabilities:

  • Unified data governance: Single control plane for all data assets
  • Fine-grained access control: Table, column, and row-level security
  • Data lineage: Automatic lineage tracking at the column level
  • Audit logging: Comprehensive audit trail of all data access
  • Data discovery: Built-in search and tagging

8.2 Unity Catalog Object Model

Account
└── Metastore (one per region)
    └── Catalog
        └── Schema (Database)
            ├── Tables
            │   ├── Managed Tables
            │   └── External Tables
            ├── Views
            ├── Functions
            ├── Volumes
            └── Models (MLflow)
Enter fullscreen mode Exit fullscreen mode

Three-Level Namespace

-- Full path syntax: catalog.schema.table
SELECT * FROM my_catalog.my_schema.my_table

-- Set default catalog and schema
USE CATALOG my_catalog;
USE SCHEMA my_schema;

-- After setting defaults, use two-level namespace
SELECT * FROM my_table
Enter fullscreen mode Exit fullscreen mode

8.3 Creating and Managing Objects

-- Create catalog
CREATE CATALOG IF NOT EXISTS production
COMMENT 'Production data catalog';

-- Create schema
CREATE SCHEMA IF NOT EXISTS production.sales
COMMENT 'Sales data schema'
MANAGED LOCATION 'abfss://container@storage.dfs.core.windows.net/production/sales/';

-- Create managed table
CREATE TABLE IF NOT EXISTS production.sales.orders (
    order_id BIGINT NOT NULL,
    customer_id BIGINT,
    amount DECIMAL(10,2),
    order_date DATE,
    status STRING
)
USING DELTA
COMMENT 'Main orders table';

-- Create external table
CREATE TABLE IF NOT EXISTS production.sales.external_data
USING DELTA
LOCATION 'abfss://container@storage.dfs.core.windows.net/external/data/'
COMMENT 'External data table';

-- Create view
CREATE OR REPLACE VIEW production.sales.active_orders AS
SELECT * FROM production.sales.orders
WHERE status = 'ACTIVE';

-- Create function
CREATE OR REPLACE FUNCTION production.sales.calculate_tax(amount DOUBLE, rate DOUBLE)
RETURNS DOUBLE
RETURN amount * rate;
Enter fullscreen mode Exit fullscreen mode

8.4 Access Control

Unity Catalog uses a hierarchical permission model.

-- Grant permissions on catalog
GRANT USAGE ON CATALOG production TO `data_engineers`;

-- Grant permissions on schema
GRANT USAGE, CREATE ON SCHEMA production.sales TO `data_engineers`;

-- Grant permissions on table
GRANT SELECT, MODIFY ON TABLE production.sales.orders TO `analysts_group`;
GRANT SELECT ON TABLE production.sales.orders TO `junior_analysts`;

-- Grant all privileges
GRANT ALL PRIVILEGES ON TABLE production.sales.orders TO `admin_group`;

-- Revoke permissions
REVOKE MODIFY ON TABLE production.sales.orders FROM `junior_analysts`;

-- Show grants
SHOW GRANTS ON TABLE production.sales.orders;
SHOW GRANTS TO `data_engineers`;

-- Row-level security using row filters
CREATE OR REPLACE FUNCTION production.sales.region_filter(region STRING)
RETURNS BOOLEAN
RETURN is_member('admin') OR region = current_user();

ALTER TABLE production.sales.orders 
SET ROW FILTER production.sales.region_filter ON (region);

-- Column-level security using column masks
CREATE OR REPLACE FUNCTION production.sales.mask_ssn(ssn STRING)
RETURNS STRING
RETURN CASE 
    WHEN is_member('hr_team') THEN ssn
    ELSE CONCAT('***-**-', RIGHT(ssn, 4))
END;

ALTER TABLE production.customers 
ALTER COLUMN ssn SET MASK production.sales.mask_ssn;
Enter fullscreen mode Exit fullscreen mode

8.5 Unity Catalog Privileges Reference

Privilege Applies To Description
CREATE CATALOG Metastore Create new catalogs
USE CATALOG Catalog Access catalog objects
CREATE SCHEMA Catalog Create schemas in catalog
USE SCHEMA Schema Access schema objects
CREATE TABLE Schema Create tables in schema
SELECT Table/View Read data
MODIFY Table Insert, update, delete
READ FILES Volume Read files from volume
WRITE FILES Volume Write files to volume
EXECUTE Function Call functions
ALL PRIVILEGES Any All applicable privileges

8.6 Data Lineage

Unity Catalog automatically captures data lineage.

# Lineage is automatically captured for:
# - Table reads and writes
# - SQL queries
# - DLT pipelines
# - Notebooks
# - Jobs

# View lineage through the Catalog Explorer UI
# Or query system tables
SELECT * FROM system.access.table_lineage
WHERE target_table_full_name = 'production.sales.orders'
LIMIT 100;
Enter fullscreen mode Exit fullscreen mode

8.7 Volumes

Volumes are Unity Catalog objects that manage access to non-tabular data in cloud storage.

-- Create external volume
CREATE EXTERNAL VOLUME production.raw_data.landing_zone
LOCATION 'abfss://container@storage.dfs.core.windows.net/landing/'
COMMENT 'Landing zone for raw data files';

-- Create managed volume
CREATE VOLUME production.raw_data.processed_files
COMMENT 'Processed file storage';

-- Access volume using /Volumes path
-- /Volumes/<catalog>/<schema>/<volume>/<path>
Enter fullscreen mode Exit fullscreen mode
# Read from volume
df = spark.read.format("json").load("/Volumes/production/raw_data/landing_zone/orders/")

# Write to volume
df.write.format("csv").save("/Volumes/production/raw_data/processed_files/output/")

# Using dbutils with volumes
dbutils.fs.ls("/Volumes/production/raw_data/landing_zone/")
Enter fullscreen mode Exit fullscreen mode

8.8 System Tables

Unity Catalog provides system tables for audit logging and access tracking.

-- Audit logs
SELECT * FROM system.access.audit
WHERE event_date >= current_date() - 7
AND action_name = 'SELECT'
LIMIT 100;

-- Table access history
SELECT * FROM system.access.table_access_history
WHERE table_full_name = 'production.sales.orders'
ORDER BY access_date DESC
LIMIT 100;

-- Billing and usage
SELECT * FROM system.billing.usage
WHERE usage_date >= current_date() - 30;

-- Query history
SELECT * FROM system.query.history
WHERE start_time >= current_timestamp() - INTERVAL 1 DAY
ORDER BY start_time DESC;
Enter fullscreen mode Exit fullscreen mode

Chapter 9: Databricks SQL

9.1 Overview of Databricks SQL

Databricks SQL is a serverless data warehouse built on top of the Databricks Lakehouse Platform. It provides:

  • SQL Editor: Web-based SQL interface
  • SQL Warehouses: Scalable compute for SQL workloads
  • Dashboards: Built-in visualization and reporting
  • Alerts: Notifications based on query results
  • Query History: Track and optimize queries

9.2 SQL Warehouses

SQL Warehouses (formerly SQL Endpoints) are the compute resources for Databricks SQL.

Types:

  • Classic: Standard SQL warehouse
  • Serverless: Databricks-managed infrastructure (fastest startup, pay-per-use)
  • Pro: Enhanced features including Unity Catalog support

Auto-scaling: SQL warehouses automatically scale to handle concurrent queries.

9.3 Advanced SQL Features

Common Table Expressions (CTEs)

-- Basic CTE
WITH customer_orders AS (
    SELECT 
        customer_id,
        COUNT(*) as order_count,
        SUM(amount) as total_spent
    FROM orders
    WHERE status = 'COMPLETED'
    GROUP BY customer_id
),
customer_segments AS (
    SELECT
        customer_id,
        CASE 
            WHEN total_spent > 10000 THEN 'Platinum'
            WHEN total_spent > 5000 THEN 'Gold'
            WHEN total_spent > 1000 THEN 'Silver'
            ELSE 'Bronze'
        END as segment
    FROM customer_orders
)
SELECT 
    c.customer_id,
    c.name,
    cs.segment,
    co.total_spent
FROM customers c
JOIN customer_orders co ON c.customer_id = co.customer_id
JOIN customer_segments cs ON c.customer_id = cs.customer_id;

-- Recursive CTE (for hierarchical data)
WITH RECURSIVE employee_hierarchy AS (
    -- Base case
    SELECT employee_id, name, manager_id, 0 AS level
    FROM employees
    WHERE manager_id IS NULL

    UNION ALL

    -- Recursive case
    SELECT e.employee_id, e.name, e.manager_id, h.level + 1
    FROM employees e
    JOIN employee_hierarchy h ON e.manager_id = h.employee_id
)
SELECT * FROM employee_hierarchy ORDER BY level, employee_id;
Enter fullscreen mode Exit fullscreen mode

Window Functions

-- Running total
SELECT 
    date,
    amount,
    SUM(amount) OVER (ORDER BY date ROWS UNBOUNDED PRECEDING) as running_total
FROM daily_sales;

-- Moving average
SELECT 
    date,
    amount,
    AVG(amount) OVER (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) as 7_day_avg
FROM daily_sales;

-- Ranking with ties
SELECT 
    product_id,
    revenue,
    RANK() OVER (ORDER BY revenue DESC) as rank,
    DENSE_RANK() OVER (ORDER BY revenue DESC) as dense_rank,
    ROW_NUMBER() OVER (ORDER BY revenue DESC) as row_num,
    NTILE(4) OVER (ORDER BY revenue DESC) as quartile,
    PERCENT_RANK() OVER (ORDER BY revenue DESC) as pct_rank,
    CUME_DIST() OVER (ORDER BY revenue DESC) as cum_dist
FROM product_sales;

-- First/Last value
SELECT 
    customer_id,
    order_date,
    amount,
    FIRST_VALUE(amount) OVER (
        PARTITION BY customer_id 
        ORDER BY order_date
    ) as first_order_amount,
    LAST_VALUE(amount) OVER (
        PARTITION BY customer_id 
        ORDER BY order_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ) as last_order_amount
FROM orders;
Enter fullscreen mode Exit fullscreen mode

PIVOT and UNPIVOT

-- PIVOT
SELECT * FROM (
    SELECT product_category, month, revenue
    FROM monthly_sales
)
PIVOT (
    SUM(revenue)
    FOR month IN ('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun')
);

-- UNPIVOT
SELECT product_category, month, revenue
FROM monthly_sales_wide
UNPIVOT (
    revenue FOR month IN (jan_revenue, feb_revenue, mar_revenue)
);
Enter fullscreen mode Exit fullscreen mode

Advanced Aggregations

-- GROUPING SETS
SELECT 
    COALESCE(category, 'ALL CATEGORIES') as category,
    COALESCE(region, 'ALL REGIONS') as region,
    SUM(sales) as total_sales
FROM sales_data
GROUP BY GROUPING SETS (
    (category, region),
    (category),
    (region),
    ()
);

-- ROLLUP
SELECT 
    year, quarter, month,
    SUM(revenue) as total_revenue
FROM sales
GROUP BY ROLLUP (year, quarter, month);

-- CUBE
SELECT 
    category, region, channel,
    SUM(sales) as total_sales
FROM sales_data
GROUP BY CUBE (category, region, channel);

-- FILTER clause
SELECT 
    category,
    SUM(amount) as total,
    SUM(amount) FILTER (WHERE status = 'COMPLETED') as completed_total,
    SUM(amount) FILTER (WHERE status = 'PENDING') as pending_total,
    COUNT(*) FILTER (WHERE amount > 1000) as large_orders
FROM orders
GROUP BY category;
Enter fullscreen mode Exit fullscreen mode

Higher-Order Functions

-- TRANSFORM: Apply function to each element
SELECT TRANSFORM(skills, s -> UPPER(s)) as upper_skills
FROM employees;

-- FILTER: Filter array elements
SELECT FILTER(scores, s -> s > 70) as passing_scores
FROM students;

-- AGGREGATE: Aggregate array elements
SELECT AGGREGATE(amounts, 0, (acc, x) -> acc + x) as total
FROM orders_with_arrays;

-- EXISTS: Check if any element matches
SELECT EXISTS(items, x -> x.price > 100) as has_expensive_items
FROM shopping_carts;

-- FORALL: Check if all elements match
SELECT FORALL(scores, s -> s >= 60) as all_passing
FROM students;

-- REDUCE (alias for AGGREGATE)
SELECT REDUCE(amounts, 0, (acc, x) -> acc + x) as sum
FROM arrays_table;
Enter fullscreen mode Exit fullscreen mode

Chapter 10: Security and Access Control

10.1 Databricks Security Model

Databricks provides multiple layers of security:

  1. Network Security: VPC/VNET peering, Private Link, IP Access Lists
  2. Authentication: SSO, SCIM provisioning, Service Principals, PAT tokens
  3. Authorization: Unity Catalog, Table ACLs, Workspace ACLs
  4. Encryption: At-rest and in-transit encryption
  5. Audit: Comprehensive audit logging

10.2 Identity and Access Management

Users and Groups

# Using Databricks SDK
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import iam

client = WorkspaceClient()

# Create service principal
sp = client.service_principals.create(
    display_name="my-service-principal",
    application_id="app-id"
)

# Add user to group
client.groups.patch(
    group_id="group-id",
    operations=[
        iam.Patch(
            op=iam.PatchOp.ADD,
            path="members",
            value=[{"value": "user-id"}]
        )
    ]
)
Enter fullscreen mode Exit fullscreen mode

Service Principals

Service principals are non-human identities used for automated jobs and service-to-service authentication.

# Generate token for service principal
# Configure in the Databricks CLI
databricks auth login --host https://workspace.azuredatabricks.net

# Use service principal in jobs
# Set environment variables
import os
os.environ["DATABRICKS_HOST"] = "https://workspace.azuredatabricks.net"
os.environ["DATABRICKS_TOKEN"] = dbutils.secrets.get("scope", "sp-token")
Enter fullscreen mode Exit fullscreen mode

10.3 Secrets Management

# Create secret scope (using CLI)
# databricks secrets create-scope --scope my-scope

# Create secret (using CLI)
# databricks secrets put --scope my-scope --key my-key

# Access secrets in notebooks
password = dbutils.secrets.get(scope="my-scope", key="db-password")

# Use in configuration (value is never shown in plain text)
jdbc_url = f"jdbc:postgresql://host:5432/db"
connection_properties = {
    "user": dbutils.secrets.get("my-scope", "db-user"),
    "password": dbutils.secrets.get("my-scope", "db-password")
}

df = spark.read.jdbc(url=jdbc_url, table="my_table", properties=connection_properties)

# List secrets (names only, not values)
dbutils.secrets.list("my-scope")
Enter fullscreen mode Exit fullscreen mode

10.4 Table ACLs (Legacy - Pre-Unity Catalog)

-- Grant table permissions (Hive metastore)
GRANT SELECT ON TABLE my_table TO `user@company.com`;
GRANT MODIFY ON TABLE my_table TO `data_engineers`;
GRANT ALL PRIVILEGES ON SCHEMA my_schema TO `schema_owner`;

-- Deny permissions
DENY SELECT ON TABLE sensitive_table TO `junior_analysts`;

-- Revoke permissions
REVOKE SELECT ON TABLE my_table FROM `user@company.com`;

-- Show grants
SHOW GRANT ON TABLE my_table;
Enter fullscreen mode Exit fullscreen mode

Chapter 11: Performance Optimization

11.1 Understanding Spark Performance

Performance optimization in Spark involves understanding and addressing:

  • Data Skew: Uneven distribution of data across partitions
  • Shuffle: Expensive data movement across the network
  • Serialization: Cost of converting objects for transfer
  • Memory Management: Efficient use of executor memory

11.2 Partitioning Strategies

# Check current number of partitions
df.rdd.getNumPartitions()

# Repartition (full shuffle)
df.repartition(100)
df.repartition(100, "customer_id")  # Hash partition by column

# Coalesce (no shuffle, can only reduce partitions)
df.coalesce(10)

# Configure default shuffle partitions
spark.conf.set("spark.sql.shuffle.partitions", "200")  # Default: 200

# Adaptive Query Execution (AQE) - automatically optimizes partitions
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
Enter fullscreen mode Exit fullscreen mode

11.3 Broadcast Joins

When one DataFrame is small enough to fit in memory, broadcasting it avoids an expensive shuffle join.

from pyspark.sql.functions import broadcast

# Manual broadcast hint
result = large_df.join(broadcast(small_df), "customer_id")

# Configure auto-broadcast threshold
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "10mb")  # Default: 10MB

# Disable auto-broadcast
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
Enter fullscreen mode Exit fullscreen mode

11.4 Caching and Persistence

# Cache in memory (serialized)
df.cache()

# Persist with storage level
from pyspark import StorageLevel
df.persist(StorageLevel.MEMORY_AND_DISK)
df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.DISK_ONLY)
df.persist(StorageLevel.OFF_HEAP)

# Unpersist
df.unpersist()

# Cache a Delta table (result in Spark memory)
spark.catalog.cacheTable("my_table")
spark.catalog.uncacheTable("my_table")
spark.catalog.clearCache()
Enter fullscreen mode Exit fullscreen mode

11.5 Query Optimization

Explain Plan

# View logical and physical execution plans
df.explain()
df.explain(True)    # Extended plan
df.explain("cost")  # Cost-based optimizer plan
df.explain("codegen")  # Generated code
df.explain("formatted")  # Formatted output

# Or in SQL
spark.sql("EXPLAIN SELECT * FROM orders WHERE status = 'ACTIVE'")
spark.sql("EXPLAIN EXTENDED SELECT * FROM orders WHERE status = 'ACTIVE'")
Enter fullscreen mode Exit fullscreen mode

Predicate Pushdown

# Spark automatically pushes filters down to minimize data read
# Write queries so filters can be pushed down to storage

# Good: Filter happens early
df.read.parquet("/path/").filter(F.col("date") == "2024-01-01")

# Bad: Filter after expensive operations
df.read.parquet("/path/").select(expensive_udf("column")).filter(F.col("date") == "2024-01-01")
Enter fullscreen mode Exit fullscreen mode

Column Pruning

# Only read columns you need
df.select("id", "amount", "date")  # Better than df.select("*")
Enter fullscreen mode Exit fullscreen mode

11.6 Delta Lake Performance Features

File Statistics and Data Skipping

Delta Lake maintains statistics (min, max, null count) for each file. Query predicates can use these to skip files entirely.

-- Z-ORDER colocates data for efficient skipping
OPTIMIZE orders ZORDER BY (customer_id, order_date)

-- Check data skipping effectiveness
SELECT * FROM system.storage.table_stats
WHERE table_catalog = 'production'
AND table_schema = 'sales'
AND table_name = 'orders';
Enter fullscreen mode Exit fullscreen mode

Liquid Clustering

Liquid Clustering is the next generation of data layout optimization, replacing OPTIMIZE ZORDER BY.

-- Create table with liquid clustering
CREATE TABLE orders
USING DELTA
CLUSTER BY (customer_id, order_date);

-- Add clustering to existing table
ALTER TABLE orders CLUSTER BY (customer_id, order_date);

-- Run clustering
OPTIMIZE orders;

-- Check clustering information
DESCRIBE TABLE EXTENDED orders;
Enter fullscreen mode Exit fullscreen mode

Auto Optimize

# Enable Auto Optimize for continuous optimization
spark.sql("""
    ALTER TABLE orders SET TBLPROPERTIES (
        'delta.autoOptimize.optimizeWrite' = 'true',
        'delta.autoOptimize.autoCompact' = 'true'
    )
""")

# Globally enable
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "true")
Enter fullscreen mode Exit fullscreen mode

11.7 Photon Engine

Photon is Databricks' next-generation query engine written in C++. It provides:

  • Vectorized execution for much faster processing
  • Accelerates SQL and DataFrame operations
  • Particularly effective for aggregations, joins, and string operations
  • Enabled per cluster configuration

11.8 Databricks Runtime Optimizations

# Use Databricks Runtime (DBR) for built-in optimizations
# DBR includes:
# - Optimized Spark
# - Delta Lake
# - MLflow
# - Performance improvements

# Check DBR version
import databricks.sdk.runtime as runtime
print(runtime.dbr_version)

# Use latest LTS version for stability
# or latest version for newest features
Enter fullscreen mode Exit fullscreen mode

Chapter 12: Exam Tips and Practice Questions

12.1 Exam Overview

The Databricks Certified Data Engineer Associate exam tests your knowledge of:

Domain Weight
Databricks Lakehouse Platform ~24%
ELT with Apache Spark ~29%
Incremental Data Processing ~22%
Production Pipelines ~16%
Data Governance ~9%

Exam Format:

  • 45 questions (multiple choice)
  • 90 minutes
  • Passing score: ~70%
  • Available on Webassessor

12.2 Key Concepts to Master

Delta Lake Must-Knows

  1. ACID transactions — Delta Lake provides full ACID compliance
  2. Transaction log (_delta_log) — Every operation is recorded
  3. Time Travel — Query historical versions using VERSION AS OF or TIMESTAMP AS OF
  4. VACUUM — Removes old files, default retention is 7 days
  5. OPTIMIZE + ZORDER — Compacts small files and colocates data
  6. Schema enforcement vs Schema evolution — Enforcement is default
  7. MERGE — Upsert operation combining INSERT and UPDATE
  8. CDF — Captures row-level changes for downstream processing

Spark Must-Knows

  1. Lazy evaluation — Transformations are lazy, actions trigger execution
  2. Partitions — Data is distributed across partitions
  3. Shuffle — Most expensive operation (GROUP BY, JOIN, DISTINCT)
  4. Broadcast join — Small table broadcast avoids shuffle
  5. Caching — Use .cache() for repeatedly accessed DataFrames
  6. AQE — Adaptive Query Execution auto-optimizes plans

Streaming Must-Knows

  1. Output modes — append, complete, update
  2. Checkpointing — Required for fault tolerance
  3. Watermarks — Handle late-arriving data
  4. Auto Loader — Best practice for file ingestion
  5. COPY INTO — SQL alternative for incremental file loading
  6. Triggers — Control when streaming query runs

DLT Must-Knows

  1. Streaming Tables vs Materialized Views — When to use each
  2. Expectations — expect, expect_or_drop, expect_or_fail
  3. APPLY CHANGES INTO — CDC handling in DLT
  4. Pipeline modes — Triggered vs Continuous
  5. Development mode — Does not enforce expectations
  6. dlt.read() vs dlt.read_stream() — Batch vs streaming read

12.3 Practice Questions


Question 1: A data engineer needs to process a large JSON dataset stored in cloud object storage incrementally. New files arrive daily. Which approach is BEST suited for this use case?

A) spark.read.json() with a daily cron schedule
B) Auto Loader with cloudFiles format and a streaming checkpoint
C) COPY INTO command in a daily job
D) spark.read.format("delta").option("readChangeFeed", "true")

Answer: B — Auto Loader with cloudFiles format is specifically designed for incremental file ingestion with exactly-once processing guarantees.


Question 2: What is stored in the _delta_log directory of a Delta table?

A) The raw Parquet data files
B) JSON and Parquet files tracking all changes to the table
C) Schema definition files only
D) A backup of deleted records

Answer: B — The _delta_log directory contains JSON transaction log files and periodic Parquet checkpoint files that record all changes.


Question 3: A Delta table has accumulated many small files over time. Which command BEST addresses this issue?

A) VACUUM table_name
B) OPTIMIZE table_name
C) ANALYZE table_name
D) REFRESH table_name

Answer: BOPTIMIZE compacts small files into larger ones. VACUUM removes old unreferenced files but doesn't compact.


Question 4: You run VACUUM sales RETAIN 0 HOURS. What is the consequence?

A) All data is permanently deleted from the table
B) You can no longer query historical versions of the data using time travel
C) All metadata is removed but data remains accessible
D) Future MERGE operations will fail

Answer: B — VACUUM removes the files needed for time travel. You can no longer access historical versions that have been vacuumed.


Question 5: In a DLT pipeline, which expectation type will FAIL the entire pipeline if violated?

A) @dlt.expect()
B) @dlt.expect_or_drop()
C) @dlt.expect_or_fail()
D) @dlt.expect_all()

Answer: C@dlt.expect_or_fail() fails the pipeline if any row violates the constraint.


Question 6: Which statement about Spark's lazy evaluation is TRUE?

A) Both transformations and actions execute immediately
B) Transformations execute immediately but actions are lazy
C) Transformations are lazy and execute only when an action is called
D) All operations are eagerly evaluated in Databricks

Answer: C — Transformations (like filter, select, groupBy) are lazy and only execute when an action (like collect, show, count) is called.


Question 7: A streaming query reads from a Kafka topic and aggregates events by 5-minute windows. Data can arrive up to 10 minutes late. Which feature handles the late data?

A) Checkpoint
B) Trigger interval
C) Watermark
D) Output mode

Answer: C — Watermarks specify how late data can arrive and are used to drop late data gracefully.


Question 8: What is the difference between MERGE and INSERT OVERWRITE in Delta Lake?

A) MERGE is only for streaming; INSERT OVERWRITE is for batch
B) MERGE can update, insert, and delete; INSERT OVERWRITE replaces matching partition data
C) INSERT OVERWRITE is ACID compliant; MERGE is not
D) There is no difference; they are aliases

Answer: BMERGE is a sophisticated upsert that can handle updates, inserts, and deletes based on conditions. INSERT OVERWRITE replaces data in matching partitions.


Question 9: In the Medallion Architecture, which layer contains business-level aggregations optimized for reporting?

A) Bronze
B) Silver
C) Gold
D) Platinum

Answer: C — The Gold layer contains business-level aggregations ready for consumption by BI tools and analysts.


Question 10: Which command shows the transaction history of a Delta table?

A) SHOW HISTORY table_name
B) DESCRIBE HISTORY table_name
C) SELECT * FROM table_name VERSION AS OF 0
D) SHOW TRANSACTIONS table_name

Answer: BDESCRIBE HISTORY table_name shows the full transaction history including operation type, timestamp, user, and version.


Question 11: A streaming query uses .trigger(once=True). What does this do?

A) Processes data once per second
B) Processes all available data in one micro-batch, then stops
C) Runs continuously with a 1-second interval
D) Processes only the first record from the source

Answer: Btrigger(once=True) processes all available data in a single batch and terminates, combining the benefits of batch and streaming.


Question 12: Which Unity Catalog privilege allows a user to read files from a Volume?

A) SELECT
B) READ FILES
C) USE VOLUME
D) USAGE

Answer: BREAD FILES is the specific privilege for reading files from Unity Catalog Volumes.


Question 13: What is the default behavior when writing a DataFrame to a Delta table without specifying a write mode?

A) Append to existing data
B) Overwrite existing data

C) Throw an error if data already exists
D) Merge with existing data

Answer: C — The default write mode is error (or errorifexists), which throws an error if the target already contains data.


Question 14: A data engineer wants to capture row-level changes to a Delta table for downstream CDC processing. Which feature should they enable?

A) Delta Time Travel
B) Change Data Feed (CDF)
C) Auto Optimize
D) Incremental VACUUM

Answer: B — Change Data Feed (CDF) captures row-level changes (insert, update_preimage, update_postimage, delete) for downstream consumption.


Question 15: In Structured Streaming, which output mode should be used for writing aggregated results (no watermark)?

A) append
B) update
C) complete
D) overwrite

Answer: C — For aggregations without watermarks, you must use complete mode, which writes the entire result table each trigger.


12.4 Common Exam Pitfalls

  1. VACUUM and Time Travel: Remember, VACUUM with a short retention period disables time travel to those removed versions. The default is 7 days.

  2. Schema Evolution vs Enforcement: By default, Delta enforces schema. mergeSchema=true is needed for schema evolution.

  3. Streaming Output Modes: Aggregations without watermarks need complete mode; simple append streams use append mode.

  4. OPTIMIZE vs VACUUM: OPTIMIZE compacts files; VACUUM removes unreferenced files. Different purposes!

  5. DLT Expectations: Remember the three types and their behaviors (warn, drop, fail).

  6. Auto Loader vs COPY INTO: Auto Loader handles billions of files with streaming; COPY INTO is simpler SQL for smaller datasets.

  7. Managed vs External Tables: Dropping a managed table deletes both metadata AND data. Dropping an external table only removes metadata.

  8. Checkpoint Location: Each streaming query needs its own unique checkpoint location.

  9. broadcast() hint: Use for tables smaller than 10MB to avoid shuffle joins.

  10. Unity Catalog three-level namespace: catalog.schema.table — don't confuse with two-level database.table.


12.5 Final Study Checklist

Before taking the exam, ensure you can:

Delta Lake:

  • [ ] Explain how the transaction log works
  • [ ] Perform MERGE, UPDATE, DELETE operations
  • [ ] Use Time Travel with VERSION AS OF and TIMESTAMP AS OF
  • [ ] Run OPTIMIZE with ZORDER
  • [ ] Configure and run VACUUM
  • [ ] Enable and use Change Data Feed
  • [ ] Apply schema evolution with mergeSchema

Spark:

  • [ ] Create and manipulate DataFrames
  • [ ] Write complex SQL with CTEs, window functions
  • [ ] Handle complex types (arrays, maps, structs)
  • [ ] Optimize joins using broadcast
  • [ ] Configure partitioning for performance
  • [ ] Read from and write to various file formats

Streaming:

  • [ ] Set up Auto Loader for file ingestion
  • [ ] Configure output modes correctly
  • [ ] Implement checkpointing
  • [ ] Apply watermarks for late data
  • [ ] Use COPY INTO for incremental loading

DLT:

  • [ ] Create streaming tables and materialized views
  • [ ] Apply expectations for data quality
  • [ ] Implement CDC with APPLY CHANGES INTO
  • [ ] Configure pipeline settings

Workflows:

  • [ ] Create multi-task jobs with dependencies
  • [ ] Configure job clusters vs all-purpose clusters
  • [ ] Use dbutils for file operations and secrets

Unity Catalog:

  • [ ] Navigate the three-level namespace
  • [ ] Grant and revoke privileges at different levels
  • [ ] Create and use Volumes
  • [ ] Understand data lineage

Conclusion

The Databricks Data Engineer Associate exam is comprehensive, covering everything from low-level Spark mechanics to high-level platform features. The key to success is:

  1. Hands-on practice: Build real pipelines using the Databricks Community Edition or a trial account
  2. Understand the WHY: Don't just memorize commands — understand when and why to use each feature
  3. Focus on Delta Lake: A large portion of the exam centers on Delta Lake features
  4. Practice with DLT: Delta Live Tables is increasingly important
  5. Review the official documentation: Databricks documentation is excellent and aligns closely with exam content

The Lakehouse architecture represents the future of data engineering, combining the best of data lakes and data warehouses. Mastering these concepts not only prepares you for the exam but equips you with skills that are highly valuable in modern data engineering roles.

Good luck with your exam! 🚀


This guide was written based on the Databricks Certified Data Engineer Associate exam guide and covers all major topics. Always refer to the official Databricks documentation for the most up-to-date information, as the platform evolves rapidly.

Top comments (0)