Databricks Data Engineering Study Notes
The Databricks Data Engineering curriculum covers data ingestion with Lakeflow Connect, deploying workloads with Lakeflow Jobs, and DevOps essentials for data engineering, teaching students to build, test, automate, and deploy production-grade data pipelines on the Databricks Data Intelligence Platform. Key concepts include Lakeflow Connect, Delta Lake, and Medallion Architecture, as well as ingestion methods like CTAS, COPY INTO, and Auto Loader. The curriculum provides a comprehensive overview of data engineering on Databricks, covering data ingestion, pipeline orchestration, and software engineering best practices.
Databricks Data Engineering — Master Study Notes
Covering: Deploy Workloads with Lakeflow Jobs | DevOps Essentials for Data Engineering | Data Ingestion with Lakeflow Connect
5-Sentence Overview: These three courses form a complete data engineering curriculum on Databricks. Lakeflow Connect teaches how to efficiently ingest data from cloud storage, SaaS applications, and databases into Delta Lake using methods like CTAS, COPY INTO, and Auto Loader. Deploy Workloads with Lakeflow Jobs covers how to orchestrate those pipelines using DAG-based multi-task jobs, triggers, and advanced control flow (If/Else, For Each, conditional dependencies). DevOps Essentials bridges software engineering best practices—modularity, testing, version control, and CI/CD—into a Databricks data engineering context using pytest, Git Folders, and Databricks Asset Bundles (DABs). Together they teach you to build, test, automate, and reliably deploy production-grade data pipelines on the Databricks Data Intelligence Platform.
═══════════════════════
COURSE 1: DATA INGESTION WITH LAKEFLOW CONNECT
═══════════════════════
PART 1 — CHEAT SHEET
Core Concepts
- Lakeflow Connect — Databricks-native ingestion layer; replaces patchwork of 3rd-party tools with unified, managed connectors into the lakehouse
- Standard Connectors — Cloud object storage ingestion (CSV, JSON, Parquet) using CTAS, COPY INTO, Auto Loader
- Managed Connectors — Fully managed UI/API-based ingestion from SaaS apps (Salesforce, etc.) and databases (Postgres, etc.)
- Batch Ingestion — Processes ALL records every run; no memory of prior runs (
CREATE TABLE AS,spark.read.load()) - Incremental Batch — Only ingests NEW files/records, skips previously loaded ones (
COPY INTO, Auto Loader with timed trigger, SDP Streaming Table) - Streaming — Continuously ingests micro-batches in near real-time (Auto Loader with continuous trigger, SDP continuous mode)
- Delta Lake — Open-source protocol storing data as Parquet files +
_delta_log/transaction logs - Medallion Architecture — Bronze (raw) → Silver (cleaned) → Gold (aggregated/business-level) layered data quality pattern
- Rescued Data Column — Auto-added
_rescued_datacolumn that captures malformed records that don't match the schema during ingestion - VARIANT data type — New semi-structured type (Public Preview 2025 Q2); more flexible and performant than STRING for JSON
- Delta Sharing — Securely share data across platforms, clouds, and regions without moving it
- Lakehouse Federation — Query external data sources in-place without ingestion; good for ad hoc/POC work
- Zerobus — Upcoming Lakeflow Connect API for high-throughput event ingestion (100 MB/s, <5s latency); ideal for IoT, clickstreams, telemetry
Ingestion Methods Comparison Table
| Method | Type | SQL Syntax | Best For | Idempotent? |
|---|---|---|---|---|
| CTAS | Batch (full) | CREATE TABLE AS SELECT * FROM read_files(...) | One-time loads, initial setup | No |
| COPY INTO | Incremental Batch | COPY INTO table FROM 'path' FILEFORMAT=... | Scheduled incremental loads | Yes |
| Auto Loader (timed) | Incremental Batch | spark.readStream + trigger | Billions of files, efficient | Yes |
| Auto Loader (continuous) | Streaming | spark.readStream + continuous trigger | Near real-time streaming | Yes |
| SDP Streaming Table | Incremental/Streaming | CREATE OR REFRESH STREAMING TABLE | Declarative pipeline approach | Yes |
JSON Semi-Structured Data Key Functions
| Function | Purpose | Example |
|---|---|---|
schema_of_json('json-string') | Derives schema from a JSON sample | Returns STRUCT<name: STRING, age: INT, ...> |
from_json(col, schema) | Converts JSON STRING column to STRUCT | from_json(json_col, 'json-schema') |
JSON Column Storage Options
| Type | Flexibility | Performance | Notes |
|---|---|---|---|
| STRING | High | Low | No schema enforcement |
| STRUCT | Medium | High | Enforces schema, best for querying |
| VARIANT | Very High | High | Public Preview; best of both worlds |
Metadata Columns on Ingest
- Append file metadata (source path, size, modification time) automatically during ingestion
- Useful for Bronze tables to track data lineage
Golden Rules — Lakeflow Connect
- Never use batch (CTAS) for recurring pipelines — it reprocesses everything; use COPY INTO or Auto Loader instead
- Auto Loader > COPY INTO for large-scale file ingestion (handles billions of files; built on Structured Streaming)
- Store raw JSON as STRUCT (not STRING) for better query performance and schema enforcement
- Use Rescued Data column to catch malformed records instead of failing the pipeline
- Managed Connectors are the right choice for SaaS/DB sources — no custom code needed
Common Pitfalls
- Using CTAS for daily batch loads (re-ingests everything every run = expensive)
- Ignoring the
_rescued_datacolumn (silently losing malformed records) - Storing JSON as plain STRING when STRUCT is available (slow queries)
- Forgetting that COPY INTO is considered legacy — prefer Auto Loader + Declarative Pipelines for new pipelines
Acronyms & Mnemonics
- CIA = CTAS → Incremental (COPY INTO) → Auto Loader (progression from basic to advanced ingestion)
- CTAS = Create Table As Select
- SDP = Spark Declarative Pipelines (formerly DLT = Delta Live Tables)
- ACID = Atomicity, Consistency, Isolation, Durability (Delta Lake guarantees)
- BSG = Bronze, Silver, Gold (Medallion layers)
PART 2 — END-TO-END PROCESS: Cloud Storage Ingestion Pipeline
Data Source (CSV/JSON/Parquet in S3/ADLS/GCS)
↓
[STEP 1] Choose Ingestion Method
├── One-time load? → CTAS
├── Scheduled incremental? → COPY INTO or Auto Loader (timed)
└── Real-time streaming? → Auto Loader (continuous) or SDP
↓
[STEP 2] Ingest to Bronze Table (raw layer)
↓
[STEP 3] Handle Schema/Malformed Data
↓
[STEP 4] Transform to Silver (clean/validated)
↓
[STEP 5] Aggregate to Gold (business logic)
Step 1 — Choose Ingestion Method
- What to do: Evaluate frequency, volume, and latency needs
- Why it matters: Wrong choice leads to cost overruns (full re-scans) or missed records
- Decision: If SLA is minutes → Auto Loader; if hourly/daily batch → COPY INTO; if once → CTAS
Step 2 — Ingest to Bronze
- What to do: Write raw data as-is to a Delta table; append metadata columns (_source_file_path, _modification_time)
- Why it matters: Raw data preservation enables reprocessing; metadata supports lineage tracking
- What could go wrong: Schema mismatch → records land in
_rescued_data; configuremergeSchemafor evolution
Step 3 — Handle Schema/Malformed Records
- What to do: Check
_rescued_datacolumn; decide to quarantine, fix, or drop malformed rows - Why it matters: Silent data loss is worse than a pipeline failure
- If JSON: use
schema_of_json()to derive schema, thenfrom_json()to cast to STRUCT
Step 4 — Transform to Silver
- What to do: Deduplicate, clean nulls, cast types, validate business rules using SDP or notebooks
- Why it matters: Silver is the "trusted" layer consumed by analysts and ML engineers
Step 5 — Aggregate to Gold
- What to do: Create business-level aggregations, joins, KPIs
- Why it matters: Serves BI tools, dashboards, and ML feature stores
Enterprise/SaaS Data (Managed Connectors):
SaaS App / Database
↓
[Credentials stored in Unity Catalog]
↓
[Managed Connector UI or API → Serverless Declarative Pipeline]
↓
[Streaming Delta Table in Lakehouse]
PART 3 — PRACTICAL KNOWLEDGE
Worked Example 1: Full Batch Ingest (CTAS)
-- One-time load from cloud storage CSV files
CREATE TABLE bronze.health_raw
AS SELECT *
FROM read_files(
's3://my-bucket/health-data/',
format => 'csv',
header => true,
inferSchema => true
);
-- ⚠️ This reprocesses ALL files every run → not suitable for daily incremental loads
Worked Example 2: Incremental Batch (COPY INTO)
-- Create target table first
CREATE TABLE IF NOT EXISTS bronze.health_raw (
patient_id STRING,
reading INT,
timestamp TIMESTAMP
);
-- Incremental load: only copies NEW files
COPY INTO bronze.health_raw
FROM 's3://my-bucket/health-data/'
FILEFORMAT = CSV
FORMAT_OPTIONS ('header' = 'true', 'inferSchema' = 'true')
COPY_OPTIONS ('mergeSchema' = 'true');
-- ✅ Idempotent: safe to re-run; already-loaded files are skipped
Worked Example 3: JSON Flattening
-- Bronze table has a raw json_col STRING column
-- Step 1: Derive the schema
SELECT schema_of_json('{"name":"John","age":35,"address":{"city":"SF","state":"CA"}}');
-- Returns: STRUCT<name: STRING, age: INT, address: STRUCT<city: STRING, state: STRING>>
-- Step 2: Cast JSON string to STRUCT
SELECT from_json(json_col,
'STRUCT<name: STRING, age: INT, address: STRUCT<city: STRING, state: STRING>>'
) AS parsed_data
FROM bronze.raw_events;
-- Step 3: Flatten to columns
SELECT
parsed_data.name,
parsed_data.age,
parsed_data.address.city,
parsed_data.address.state
FROM (
SELECT from_json(json_col, 'STRUCT<...>') AS parsed_data FROM bronze.raw_events
);
Practice Questions
-
(Easy) What are the three ingestion patterns available for cloud object storage in Lakeflow Connect?
-
(Easy) What is the difference between batch and incremental batch ingestion? Give a use-case for each.
-
(Medium) A daily job uses CTAS to load 1TB of CSV files. After 30 days, it takes 10x longer. Why? How would you fix it?
-
(Medium) Your pipeline ingests JSON files but some records have extra fields not in your schema. What happens, and how do you handle it?
-
(Hard) You need to ingest from Salesforce (SaaS), PostgreSQL (on-prem database), and S3 (cloud storage) simultaneously into Bronze tables. For each source, identify the correct Lakeflow Connect tool/method and explain the architecture.
Real-World Application
Scenario 1 — E-commerce clickstream pipeline: An e-commerce site generates millions of click events per minute. Use Auto Loader with continuous trigger + Declarative Pipelines Streaming Table to ingest JSON events from S3 into Bronze in near real-time (<5 second latency). Use _rescued_data to quarantine malformed events without dropping them.
Scenario 2 — CRM data sync: Sales team needs daily Salesforce data in the lakehouse. Use Lakeflow Connect Managed Connectors with the Salesforce connector — point-and-click configuration, no custom API code, Databricks manages the pipeline. Data lands as Streaming Delta Tables in the Bronze layer automatically.
What Professors/Interviewers Test Most
- Difference between CTAS, COPY INTO, and Auto Loader (when to use each)
- Why COPY INTO is idempotent and what that means
- How
_rescued_dataworks and when it appears - Medallion architecture (Bronze/Silver/Gold) and what belongs in each layer
- JSON handling:
schema_of_json()+from_json()workflow
════════════════════════
COURSE 2: DEPLOY WORKLOADS WITH LAKEFLOW JOBS
════════════════════════
PART 1 — CHEAT SHEET
Core Concepts
- Lakeflow Jobs — Unified orchestration platform for data, analytics, and AI workloads on Databricks (formerly Workflows)
- Job — The primary resource for scheduling, coordinating, and running one or more tasks
- Task — A single unit of work within a job (notebook, Python script, SQL query, SDP pipeline, dbt, JAR, AI/BI dashboard, etc.)
- DAG (Directed Acyclic Graph) — Directed=one-way edges, Acyclic=no loops, Graph=vertices+edges; defines task execution order
- Trigger — A rule that automatically starts a job based on a schedule or event
- Task Values — Dynamic key-value pairs tasks create and share during execution (
dbutils.jobs.taskValues.set/get) - Dynamic Value References —
{{ }}notation to reference runtime context (e.g.,{{job.start_time.day}},{{tasks.taskA.values.my_val}}) - Run If — Conditional execution based on upstream task status (All succeeded, At least one succeeded, None failed, etc.)
- If/Else Task — Branching logic that routes workflow based on a condition
- For Each Task — Iterates over a list, running a nested task for each item
- Repair Run — Re-runs only failed tasks in a job run (not the whole job); saves time and money
- system.lakeflow — Read-only system catalog logging all job activity across workspaces; key tables:
jobs,job_tasks,job_run_timeline,job_task_run_timeline - Serverless Compute — Fully managed compute with auto-scaling, Photon enabled by default, faster startup, lower TCO
- Modular Orchestration — Breaking large jobs into parent + child jobs using "Run Job" task type
Trigger Types
| Trigger | When to Use | Notes |
|---|---|---|
| Manual | Ad hoc runs, debugging, backfills | Start via UI, API, CLI, SDK, or DABS |
| Scheduled (Cron) | Recurring time-based runs | Hourly, daily, or custom cron expression |
| File Arrival | Unpredictable/irregular file drops | Supports S3, ADLS, GCS, Databricks Volumes |
| Table Update | Event-driven on data change | Up to 10 tables; Any/All updated condition; min time between triggers |
| Continuous | Streaming workloads | Auto-restarts as soon as previous run finishes/fails |
Compute Options
| Type | Cost | Use Case | Notes |
|---|---|---|---|
| Interactive (All-Purpose) | High | Dev/exploration only | Never use in production |
| Job Clusters | ~50% cheaper | Production workloads | Subject to cloud start-up time |
| Serverless | Lowest TCO | Production (recommended) | Photon on, fast start, auto-scale |
| SQL Warehouse | SQL/BI workloads | Dashboards, SQL tasks | Serverless by default |
DAG Patterns
| Pattern | Shape | Common Use |
|---|---|---|
| Sequence | A → B → C | ETL: Bronze → Silver → Gold |
| Funnel | A,B,C → D | Multiple sources merging into one |
| Fan-out | A → B,C,D | Single source distributing to multiple targets |
Golden Rules — Lakeflow Jobs
- Never use interactive clusters in production — use Job or Serverless clusters
- Always use Repair Run instead of re-running the whole job when a task fails
- DAGs cannot have cycles — if your workflow seems circular, you need modular jobs
- Max 1000 tasks per job — break complex pipelines into parent/child modular design
- Use service principals (not personal accounts) for job ownership in production
Common Pitfalls
- Using interactive clusters for production jobs (costly, not scalable)
- Re-running entire jobs on failure instead of using Repair Run
- Hardcoding parameters instead of using task parameters/dynamic references
- Not setting up alerts for job failures, delays, or timeouts
- Forgetting that Table Update trigger has an "Any" vs "All" tables condition distinction
Acronyms & Mnemonics
- DAG = Directed Acyclic Graph
- DABS = Databricks Asset Bundles
- DBU = Databricks Unit (billing unit)
- TCO = Total Cost of Ownership
- SDP = Spark Declarative Pipelines
- "SAFE" = Scheduled, Arrival (file), For-each/continuous, Event-driven (table) — the trigger types
PART 2 — END-TO-END PROCESS: Building and Running a Production Job
[STEP 1] Design the DAG
↓
[STEP 2] Create the Job in UI/API/DABS
↓
[STEP 3] Add Tasks & Configure Compute
↓
[STEP 4] Set Parameters, Task Values, Dynamic References
↓
[STEP 5] Set Up Trigger
↓
[STEP 6] Configure Alerts & Retry Policy
↓
[STEP 7] Run & Monitor
↓
[STEP 8] Handle Failures with Repair Run
Step 1 — Design the DAG
- What to do: Map out task dependencies; identify which tasks can run in parallel vs. sequentially
- Why it matters: Parallel tasks reduce total runtime; wrong dependencies break the pipeline
- Pitfall: Creating circular dependencies (DAGs must be acyclic)
Step 2 — Create the Job
- What to do: In the Lakeflow Jobs UI, click "Create Job"; name it; add first task
- If/Else: Use API or DABS (Databricks Asset Bundles) for programmatic/CI-CD job creation
Step 3 — Add Tasks & Configure Compute
- What to do: For each task, choose type (Notebook, SQL, SDP, Python script, etc.) and assign compute
- Why it matters: Each task can use different compute types; SQL tasks require SQL Warehouse
- Decision: Is this time-sensitive? → Serverless with Performance Optimized ON; Otherwise → standard serverless or job cluster
Step 4 — Set Parameters, Task Values, Dynamic References
- What to do: Add job-level parameters; use
dbutils.jobs.taskValues.set()to pass data between tasks; use{{ }}references for runtime values - Example: Task A computes row count, stores as task value; Task B reads it to decide next step
- Pitfall: Not parameterizing tasks makes them brittle and non-reusable
Step 5 — Set Up Trigger
- If time-based → use Scheduled trigger with cron expression
- If file-based → use File Arrival trigger with storage path
- If data-change-based → use Table Update trigger (up to 10 tables; choose Any/All condition)
- If streaming → use Continuous trigger
Step 6 — Configure Alerts & Retry Policy
- What to do: Add notification destinations (Email, Slack, PagerDuty, Teams, Webhook); set retry policy per task
- Why it matters: Silent failures are dangerous in production; retries handle transient errors
Step 7 — Run & Monitor
- What to do: Use Job UI timeline to see task start/end/overlap; use Spark UI for query-level details
- Insights: High planning time → improve pruning/partitioning; High execution time → fix joins/aggregations (broadcast, skew)
Step 8 — Handle Failures with Repair Run
- What to do: In the failed job run, click "Repair Run"; select failed tasks; optionally override parameters; re-run only those tasks
- Why it matters: Saves time and cost vs. re-running the entire job
Mermaid Diagram — Conditional Workflow Pattern
flowchart TD
A[Ingest Data] --> B[Check Duplicates - If/Else]
B -->|True: has duplicates| C[Drop Duplicates]
B -->|False: clean data| D[Apply Transformations]
C --> E[Join & Aggregate]
D --> E
E --> F[For Each: iterate by region]
F --> G[Write Regional Tables]
G --> H[Refresh Dashboard]
PART 3 — PRACTICAL KNOWLEDGE
Worked Example 1: Task Values Between Tasks
# Task A: Ingest and count records
df = spark.read.table("bronze.sales")
row_count = df.count()
# Store the count for downstream tasks to read
dbutils.jobs.taskValues.set(key="row_count", value=row_count)
# Task B: Downstream task reads the value
row_count = dbutils.jobs.taskValues.get(taskKey="ingest_task", key="row_count")
print(f"Processing {row_count} records")
Worked Example 2: Dynamic Value Reference
In the Job UI, you can reference upstream task values in parameters using {{ }}:
Parameter: source_table
Value: {{tasks.ingest_task.values.catalog_name}}.bronze.sales
This gets resolved at runtime — the actual catalog name is injected dynamically.
Worked Example 3: Run If Dependency
Task topology:
Task 1 (Ingest A) ─┐
Task 2 (Ingest B) ─┼→ Task 4 (Merge) [Run if: At Least One Succeeded]
Task 3 (Ingest C) ─┘
Result: Even if Task 3 fails, Task 4 still runs if Task 1 or Task 2 succeeded.
This is used for optional/best-effort data sources.
Practice Questions
-
(Easy) What is the difference between a Job and a Task in Lakeflow Jobs?
-
(Easy) List the 5 trigger types and give one use-case for each.
-
(Medium) A job with 20 tasks fails at task 15. What is the most efficient way to recover? Walk through the steps.
-
(Medium) You need Task B to receive the output count from Task A. What two mechanisms can you use, and how do they differ?
-
(Hard) Design a job DAG for this scenario: Ingest from 3 sources in parallel → join them → check for data quality issues (If/Else) → if pass: iterate per region using For Each → write to Gold tables → refresh dashboard. Draw the DAG and list the task types used at each node.
Real-World Application
Scenario 1 — Nightly ETL pipeline: A retail company needs to process daily sales data from 50 regional stores by 6am. Design a Lakeflow Job with a File Arrival trigger on S3, parallel ingestion tasks per region using For Each, a merge task using "All Succeeded" run-if dependency, and alerting to Slack on failure. Use serverless compute with Performance Optimized ON to ensure tasks start and finish before the 6am deadline.
Scenario 2 — ML retraining pipeline: A recommendation model needs retraining when the feature table is updated. Set up a Table Update trigger on gold.features table. The job DAG: validate data → unit test → train model → evaluate metrics (If/Else: if accuracy > threshold, deploy; else, alert and stop). Use Repair Run to only re-run the training step if it fails without re-running validation.
What Professors/Interviewers Test Most
- DAG definition (Directed, Acyclic, Graph — what each word means)
- Difference between Run If vs. If/Else task
- When to use each trigger type
- Serverless vs. Job Cluster vs. Interactive Cluster tradeoffs
- How Repair Run works and why it saves cost
- system.lakeflow table names and what they contain
═══════════════════
COURSE 3: DEVOPS ESSENTIALS FOR DATA ENGINEERING
═══════════════════
PART 1 — CHEAT SHEET
Core Concepts
- DevOps — A culture/practice that bridges Software Engineering Best Practices + IT Operations to automate and streamline development and deployment
- DataOps — DevOps applied to data engineering; treats data pipelines like software
- MLOps — DevOps applied to machine learning; treats models as data + software
- CI (Continuous Integration) — Frequently merging code to a shared repo + running automated tests to detect issues early
- CD (Continuous Delivery) — Auto-deploying to staging after tests pass; manual gate to production
- CD (Continuous Deployment) — Fully automated deploy to production after tests pass (no manual gate)
- Modularized Code — Breaking code into reusable functions instead of one big script; enables testing and reuse
- Unit Test — Tests a single isolated function on small, controlled data; fast, cheap, high coverage
- Integration Test — Tests how components work together (notebooks + SDP + Jobs)
- System Test — End-to-end test of the entire pipeline in a real-world scenario; slowest
- pytest — Python testing framework; discovers and runs all functions starting with
test_; minimal syntax - assertDataFrameEqual — PySpark testing utility (
pyspark.testing.utils) comparing actual vs expected DataFrames - Git Folders (Repos) — Databricks integration for version control; supports clone, commit, push, pull, branch management in the UI
- Databricks Asset Bundles (DABs) — YAML-based configuration system to define, deploy, and test Databricks resources across environments; write once, deploy everywhere
- Branching Strategy — Using Git branches (e.g., Gitflow) to isolate feature development from production; prevents breaking main
- Service Principal — A non-human identity for automated job execution; replaces personal user accounts in production
- PAT (Personal Access Token) — Used to authenticate Databricks with GitHub for Git integration
Environment Isolation
| Environment | Data | Purpose |
|---|---|---|
| Dev | Small static subset of prod (anonymized/synthetic) | Development and rapid iteration |
| Stage | Larger subset, mirrors prod structure | Realistic testing and validation |
| Prod | Live, real data; high security | Operational production workloads |
Isolation Methods:
- Workspace isolation — Separate Databricks workspaces per environment (DEV/STG/PRD)
- Catalog isolation — Separate Unity Catalog catalogs per environment within one workspace
Testing Pyramid
/System Tests\ ← Slowest, least coverage, most expensive
/Integration \ ← Medium speed; tests component interactions
/ Unit Tests \ ← Fastest, cheapest, highest coverage (write most of these)
CI/CD Deployment Flow (DABs)
[Local Dev] → commit → [Git Repo] → CI triggers → [Staging Deploy + Tests] → approval → [Prod Deploy]
↑ manual deploy to dev workspace for testing
Deployment Tools Comparison
| Tool | Type | Best For |
|---|---|---|
| REST API | HTTP requests | Direct programmatic access; custom integrations |
| Databricks CLI | Command line | One-off tasks, shell scripting, experimentation |
| Databricks SDK (Python/Java/Go/R) | Library | Building applications with error-handling |
| DABs (Asset Bundles) | YAML config + CLI | Full CI/CD; recommended for production |
Golden Rules — DevOps/DataOps
- Modularize everything — every transformation should be a function, not inline code
- Unit tests first — write them for every function; catch bugs before integration
- Never test with prod data in dev — use anonymized/synthetic subsets
- Use service principals for production jobs, never personal accounts
- DABs is the recommended approach for deploying Databricks assets via CI/CD
Common Pitfalls
- Writing monolithic notebooks with no functions (untestable, not reusable)
- Using production data in dev/stage environments (privacy risk, slow)
- Testing only integration/system, skipping unit tests (expensive to run, hard to debug)
- Using personal account credentials in production jobs (breaks when employee leaves)
- Not using branching strategy (everyone commits to main → chaos)
Key PySpark Testing Code
# The function to test
from pyspark.sql.functions import col, when
def add_new_col(df, new_col_name, source_col):
return df.withColumn(new_col_name,
when(col(source_col) == 0, 'Normal').otherwise('Unknown'))
# The unit test
from pyspark.testing.utils import assertDataFrameEqual
def test_add_new_col():
# 1. Create sample input DataFrame
df = spark.createDataFrame([(0,), (1,), (-1,), (None,)], ["value"])
# 2. Execute the function
actual_df = add_new_col(df, "new_value", "value")
# 3. Create expected result
expected_df = spark.createDataFrame(
[(0, 'Normal'), (1, 'Unknown'), (-1, 'Unknown'), (None, 'Unknown')],
["value", "new_value"]
)
# 4. Assert equality (raises error if different)
assertDataFrameEqual(actual_df, expected_df)
Mnemonics & Acronyms
- DABs = Databricks Asset Bundles
- PAT = Personal Access Token
- CI = Continuous Integration ("Commit + Integrate")
- CD = Continuous Delivery/Deployment ("Code Delivered/Deployed")
- PCBT = Plan → Code → Build → Test (the CI loop)
- "MCIT" = Modularize, CI, Isolate environments, Test early — DataOps core mantra
PART 2 — END-TO-END PROCESS: Building a CI/CD Pipeline for Data Engineering
[STEP 1] Plan the project & isolate environments
↓
[STEP 2] Modularize PySpark code into functions
↓
[STEP 3] Write unit tests (pytest + assertDataFrameEqual)
↓
[STEP 4] Write integration tests (SDP expectations or Jobs tasks)
↓
[STEP 5] Set up version control (Git Folders + branching strategy)
↓
[STEP 6] Configure DABs (YAML bundles) for deployment
↓
[STEP 7] CI pipeline: auto-test on every PR/commit
↓
[STEP 8] CD pipeline: auto-deploy to staging, then production
Step 1 — Plan & Isolate Environments
- What to do: Define deliverables; create DEV/STG/PRD workspaces or catalogs; create synthetic dev data
- Why it matters: Mixing environments causes production data exposure and untested code in prod
- Decision: Single workspace with catalog isolation (simpler) vs. multiple workspaces (stronger isolation)
Step 2 — Modularize PySpark Code
- What to do: Extract every logical operation into a named function; move from monolithic notebook to modular files
- Before: Inline
df.withColumn(...)repeated everywhere - After:
def add_column(df, ...)in a shared module, imported wherever needed - Why it matters: You can't unit test inline code; functions are testable, reusable, and maintainable
Step 3 — Write Unit Tests
- What to do: For every function, write a
test_function using pytest; useassertDataFrameEqualfor DataFrame comparisons - Why it matters: Catches bugs early (cheapest time to fix); enables safe refactoring
- What could go wrong: Testing too little (missing edge cases like null values); or testing implementation details instead of behavior
Step 4 — Write Integration Tests
- Method 1 (SDP): Add
@expector@expect_or_dropannotations to Declarative Pipeline tables to validate row counts, distinct values, etc. - Method 2 (Jobs): Add a Unit Test task at the start of a Job; add validation tasks after SDP runs to check row counts, nulls, duplicates, value ranges
- When to use which: Method 1 for SDP-centric pipelines; Method 2 for non-SDP or mixed pipelines
Step 5 — Version Control with Git Folders
- What to do: Connect Databricks to GitHub via PAT; clone repo into Git Folder; work on feature branches; commit + push from Databricks UI
- Key operations: clone, commit, push, pull, branch, visual diff
- Branching: Isolate work with feature branches; merge to main only after review
Step 6 — Configure DABs
- What to do: Create
databricks.ymlbundle configuration defining resources (jobs, pipelines, clusters), environments (dev/staging/prod), and deployment targets - Why it matters: Write once, deploy everywhere; version-controlled job definitions
Step 7 — CI Pipeline (Auto-test)
- Triggered by: PR or commit to a branch
- Steps: checkout code → deploy to dev → run unit tests → run integration tests → report results
Step 8 — CD Pipeline (Auto-deploy)
- On merge to main: auto-deploy to staging; run integration tests
- On approval: deploy to production
- Continuous Deployment: skip the manual approval gate (fully automated)
PART 3 — PRACTICAL KNOWLEDGE
Worked Example 1: Non-Modular vs. Modular Code
Before (BAD — non-modular):
# Everything inline — can't test individual pieces
df = spark.read.csv("health.csv", header=True, inferSchema=True)
df = df.withColumn("status", when(col("reading") == 0, 'Normal').otherwise('Unknown'))
df.write.saveAsTable("bronze.health")
After (GOOD — modular):
def load_health_data(file_path):
"""Load health CSV data from cloud storage."""
return spark.read.csv(file_path, header=True, inferSchema=True)
def add_status_column(df, new_col, source_col):
"""Classify readings: 0 → Normal, else → Unknown."""
return df.withColumn(new_col, when(col(source_col) == 0, 'Normal').otherwise('Unknown'))
def save_to_bronze(df, table_name):
"""Persist DataFrame to a Delta table."""
df.write.saveAsTable(table_name)
# Pipeline is now testable + reusable
df = load_health_data("s3://bucket/health.csv")
df = add_status_column(df, "status", "reading")
save_to_bronze(df, "bronze.health")
Worked Example 2: Full Unit Test with Edge Cases
from pyspark.testing.utils import assertDataFrameEqual
from pyspark.sql.functions import col, when
def test_add_status_column_handles_edge_cases():
# Edge cases: zero, positive, negative, null
input_data = [(0,), (1,), (-1,), (None,)]
df = spark.createDataFrame(input_data, ["reading"])
actual = add_status_column(df, "status", "reading")
expected_data = [(0, 'Normal'), (1, 'Unknown'), (-1, 'Unknown'), (None, 'Unknown')]
expected = spark.createDataFrame(expected_data, ["reading", "status"])
assertDataFrameEqual(actual, expected)
# ✅ Passes: null is treated as Unknown (correct behavior)
Worked Example 3: DABs YAML Structure
# databricks.yml
bundle:
name: health_pipeline
targets:
dev:
workspace:
host: https://dev.azuredatabricks.net
default: true
staging:
workspace:
host: https://stg.azuredatabricks.net
prod:
workspace:
host: https://prod.azuredatabricks.net
resources:
jobs:
health_etl_job:
name: Health ETL Pipeline
tasks:
- task_key: unit_tests
notebook_task:
notebook_path: ./notebooks/run_unit_tests
- task_key: run_pipeline
depends_on:
- task_key: unit_tests
pipeline_task:
pipeline_id: ${resources.pipelines.health_pipeline.id}
Practice Questions
-
(Easy) What is the difference between Continuous Delivery and Continuous Deployment?
-
(Easy) What is the purpose of
assertDataFrameEqual? What does it test? -
(Medium) A colleague pushes a broken function to main and breaks the production pipeline. What DevOps practices would have prevented this? List at least 3.
-
(Medium) You have a PySpark function that filters rows based on a date range. Write a unit test for it, including at least 2 edge cases.
-
(Hard) Design a complete CI/CD architecture for a Databricks data engineering team of 10 engineers deploying to DEV/STG/PROD. Specify: environment isolation strategy, branching strategy, test types at each stage, deployment tool, and who approves promotion to production.
Real-World Application
Scenario 1 — Healthcare data pipeline team: A 5-person team builds a daily health analytics pipeline. Each engineer works on a feature branch. On PR creation, GitHub Actions triggers a DABs deploy to dev workspace → runs pytest unit tests → if pass, merges to main → auto-deploys to staging → runs integration tests → manual approval → deploys to prod. Service principals own production jobs; personal accounts only access dev.
Scenario 2 — New data engineer onboarding: The new hire clones the team's GitHub repo into a Databricks Git Folder, runs existing unit tests (all pass), sees exactly what functions exist and what they're supposed to do from docstrings, creates a feature branch, adds a new transformation function + its unit test, commits from the Databricks UI, and opens a PR for code review. No production risk at any point.
What Professors/Interviewers Test Most
- CI vs. CD difference (and Continuous Delivery vs. Continuous Deployment)
- Why modular code is better than monolithic (specifically: testability, reusability, maintainability)
- How to write a PySpark unit test step-by-step
- The three testing levels (Unit < Integration < System) and the speed/cost tradeoff
- What DABs are and why Databricks recommends them over REST API/CLI for CI/CD
- Git branching strategy and why branching matters for CI
════════════════════════════════
CROSS-COURSE INTEGRATION MAP
════════════════════════════════
Lakeflow Connect Lakeflow Jobs DevOps / CI/CD
───────────────── ───────────────── ─────────────────
Ingest from sources → Orchestrate the pipeline → Test & deploy reliably
CTAS / COPY INTO / DAG of tasks Modular code
Auto Loader Triggers Unit + Integration tests
Managed Connectors Conditional logic Git + DABs
Monitoring + Repair CI/CD pipeline
How they connect:
- Lakeflow Connect handles getting data in
- Lakeflow Jobs handles when and how pipelines run
- DevOps handles how to safely build and deploy those pipelines
A production pipeline typically uses ALL THREE:
- Auto Loader (Connect) → ingests files incrementally into Bronze
- Lakeflow Jobs (Jobs) → orchestrates Bronze→Silver→Gold transformations triggered on Table Update
- DABs + pytest (DevOps) → ensures every code change is tested before hitting production
WHAT TO REVIEW NEXT:
- Spark Declarative Pipelines (SDP / formerly DLT) — the transformation layer between ingestion and orchestration; deeply integrates with all three courses
- Unity Catalog — governance, access control, and data lineage that underpins all three courses (all environments, all connectors, all jobs)
- Databricks Asset Bundles (DABs) Deep Dive — advanced YAML configuration, CI/CD integration with GitHub Actions/Azure DevOps, multi-workspace parameterization