Senior Data Engineering / SQL
A complete set of senior-level Data Engineering interview questions covering SQL mastery, query optimisation, Apache Spark, Kafka, dbt, data modelling, pipeline design, lakehouse architectures, streaming, orchestration, and data quality.
SQL Core
10 questionsSQL clauses are written in a fixed order (SELECT β¦ FROM β¦ WHERE β¦ GROUP BY β¦ HAVING β¦ ORDER BY β¦ LIMIT) but are evaluated in a different logical order:
FROM/JOINβ identify source tables and produce the working relation.WHEREβ filter individual rows before any grouping.GROUP BYβ collapse rows into groups.HAVINGβ filter groups (operates on aggregates).SELECTβ compute output expressions and column aliases.DISTINCTβ deduplicate result rows.ORDER BYβ sort (can reference aliases defined in SELECT).LIMIT / OFFSETβ trim the final result set.
Why it matters:
- You cannot reference a
SELECTalias in aWHEREclause βWHEREis evaluated beforeSELECT. Use a subquery or CTE instead. WHEREcannot filter on aggregate values (e.g.,WHERE COUNT(*) > 5is illegal) β useHAVING.- Window functions are computed after
GROUP BYandHAVINGbut beforeORDER BY, so you can use them inORDER BYbut not inWHERE.
-- Illegal: alias defined in SELECT not yet available in WHERE
SELECT order_total * 1.2 AS gross, customer_id
FROM orders
WHERE gross > 1000; -- β "gross" doesn't exist yet
-- Correct: use a CTE or subquery
WITH base AS (
SELECT order_total * 1.2 AS gross, customer_id FROM orders
)
SELECT * FROM base WHERE gross > 1000;
RANK(), DENSE_RANK(), and ROW_NUMBER()?Window functions perform calculations across a set of rows (the window) that are related to the current row, without collapsing them into a single group. The OVER() clause defines the window.
ROW_NUMBER()β assigns a unique sequential integer to every row within the partition, regardless of ties. Non-deterministic when rows share the same sort key (the tie-break is arbitrary). Use when you need exactly one row per group.RANK()β ties receive the same rank, and the next rank skips. If two rows share rank 2, the next row is rank 4. Gaps appear.DENSE_RANK()β ties receive the same rank, but no ranks are skipped. If two rows share rank 2, the next row is rank 3. No gaps.
SELECT
customer_id,
order_total,
ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_total DESC) AS rn,
RANK() OVER (PARTITION BY customer_id ORDER BY order_total DESC) AS rnk,
DENSE_RANK() OVER (PARTITION BY customer_id ORDER BY order_total DESC) AS drnk
FROM orders;
-- order_total: 500, 500, 300
-- ROW_NUMBER: 1, 2, 3 (unique, arbitrary tie-break)
-- RANK: 1, 1, 3 (tie β same rank, gap after)
-- DENSE_RANK: 1, 1, 2 (tie β same rank, no gap)
Common window function use cases: deduplication (keep latest row per key with ROW_NUMBER()), running totals (SUM() OVER (ORDER BY date)), moving averages (AVG() OVER (ROWS BETWEEN 6 PRECEDING AND CURRENT ROW)), lead/lag comparisons (LAG(revenue, 1)), percentile ranking (PERCENT_RANK(), NTILE(4) for quartiles).
A Common Table Expression (CTE) is a named temporary result set defined within a WITH clause, scoped to the query that follows. CTEs improve readability over subqueries and can be referenced multiple times in the same query (some databases materialise them).
A recursive CTE references itself, enabling traversal of hierarchical or graph-structured data (org charts, bill of materials, file trees, graph shortest paths). It has two parts: the anchor member (base case) unioned with the recursive member.
-- Traverse an employee hierarchy: find all reports under manager 1
WITH RECURSIVE org_tree AS (
-- Anchor: the root manager
SELECT id, name, manager_id, 1 AS depth
FROM employees
WHERE id = 1
UNION ALL
-- Recursive: join each level's children
SELECT e.id, e.name, e.manager_id, ot.depth + 1
FROM employees e
INNER JOIN org_tree ot ON e.manager_id = ot.id
)
SELECT id, name, depth FROM org_tree ORDER BY depth, name;
When to use recursive CTEs: hierarchical data (employee β manager), tree traversal (category β subcategory), graph problems (shortest path in SQL β with a cycle-detection guard), date spine generation (generate a row per day between two dates without a calendar table).
Pitfall: Always add a termination condition (a WHERE depth < N or cycle-detection via an array of visited IDs) β an infinite loop will exhaust server resources.
LATERAL and CROSS JOIN LATERAL.- INNER JOIN β returns rows where the join condition matches in both tables.
- LEFT OUTER JOIN β all rows from the left table; NULLs for unmatched right rows.
- RIGHT OUTER JOIN β the mirror of LEFT JOIN.
- FULL OUTER JOIN β all rows from both tables; NULLs where no match on either side.
- CROSS JOIN β Cartesian product of both tables. Every row combined with every other row. Use for generating combinations or with a
LIMITfor sampling. - SELF JOIN β joining a table to itself. Classic uses: comparing rows within the same table (employees to their managers), gap-and-island problems.
LATERAL join (PostgreSQL/BigQuery/Snowflake; CROSS APPLY/OUTER APPLY in SQL Server) allows a subquery on the right side to reference columns from the left side β it is evaluated row-by-row. This enables correlated subqueries that return multiple rows per left-side row.
-- For each customer, get their 3 most recent orders (not possible with a plain JOIN)
SELECT c.customer_id, c.name, recent.order_id, recent.created_at
FROM customers c
CROSS JOIN LATERAL (
SELECT order_id, created_at
FROM orders o
WHERE o.customer_id = c.customer_id -- references outer row
ORDER BY created_at DESC
LIMIT 3
) AS recent;
-- LATERAL is also used to unnest arrays row-by-row
SELECT u.id, tag
FROM users u, LATERAL UNNEST(u.tags) AS tag;
GROUPING SETS, ROLLUP, and CUBE? When are they useful in analytical queries?These are extensions to GROUP BY that compute multiple grouping combinations in a single query, avoiding expensive UNION ALL constructs.
GROUPING SETS((a),(b),(a,b),())β explicitly enumerate which combinations to compute. The most flexible.ROLLUP(country, city, store)β generates a hierarchy:(country, city, store),(country, city),(country),(). Ideal for subtotals and grand totals in reports. N+1 groupings for N columns.CUBE(a, b, c)β generates all 2^N possible grouping combinations. Expensive but useful for multi-dimensional analysis (OLAP-style).
-- Sales report with subtotals per region and grand total β one scan
SELECT
region,
product_category,
SUM(revenue) AS total_revenue,
GROUPING(region) AS is_region_subtotal, -- 1 if this row is a subtotal
GROUPING(product_category) AS is_category_subtotal
FROM sales
GROUP BY ROLLUP(region, product_category)
ORDER BY region NULLS LAST, product_category NULLS LAST;
-- Output includes:
-- (US, Electronics, 50000) -- leaf
-- (US, NULL, 85000) -- region subtotal
-- (NULL, NULL, 200000) -- grand total
Use GROUPING() or GROUPING_ID() to distinguish NULL-as-subtotal from NULL-as-missing-data in the result set.
Gap-and-island problems involve identifying contiguous sequences (islands) and breaks between them (gaps) in ordered data. Classic examples: consecutive login days, uninterrupted sensor readings, overlapping date ranges.
Classic approach β row number subtraction:
-- Find consecutive active days per user
WITH numbered AS (
SELECT
user_id,
activity_date,
activity_date - ROW_NUMBER() OVER (
PARTITION BY user_id ORDER BY activity_date
) * INTERVAL '1 day' AS grp -- same value for consecutive dates
),
islands AS (
SELECT
user_id,
grp,
MIN(activity_date) AS island_start,
MAX(activity_date) AS island_end,
COUNT(*) AS streak_days
FROM numbered
GROUP BY user_id, grp
)
SELECT * FROM islands WHERE streak_days >= 7 -- streaks of 7+ days
ORDER BY user_id, island_start;
Finding gaps β use LEAD() to look at the next row's start date and check if it equals current end + 1 day:
SELECT
end_date + INTERVAL '1 day' AS gap_start,
LEAD(start_date) OVER (ORDER BY start_date) - INTERVAL '1 day' AS gap_end
FROM ranges
WHERE LEAD(start_date) OVER (ORDER BY start_date) > end_date + INTERVAL '1 day';
UNION and UNION ALL? When does UNION hurt performance?UNION ALL concatenates result sets without any deduplication. It is always the faster option because it requires no sorting or hashing.
UNION (same as UNION DISTINCT) removes duplicate rows. Internally this requires a sort or hash-aggregate step across the full combined result set β equivalent to UNION ALL + SELECT DISTINCT.
When UNION hurts:
- Large result sets: deduplication involves a full sort or hash of the entire union β O(N log N) vs O(N) for
UNION ALL. - When duplicates can't actually exist: if you're unioning partitioned tables by non-overlapping date ranges, duplicates are impossible β using
UNIONwastes the dedup step. - In Spark / distributed engines:
UNIONforces a shuffle and reduce step across all partitions;UNION ALLis a simple concatenation with no shuffle.
Rule of thumb: always write UNION ALL by default. Only switch to UNION when you know duplicates can exist across the branches and you actually need them removed. When in doubt, profile β dedup costs scale linearly with result set size.
Sessionisation groups a user's events into sessions based on a time-gap threshold. The pattern is: flag the start of each new session, then compute a running sum of that flag to create a session ID.
WITH flagged AS (
SELECT
user_id,
event_time,
-- Mark rows where the gap to the previous event exceeds 30 minutes
CASE
WHEN event_time - LAG(event_time) OVER (
PARTITION BY user_id ORDER BY event_time
) > INTERVAL '30 minutes'
OR LAG(event_time) OVER (
PARTITION BY user_id ORDER BY event_time
) IS NULL -- first event is always a new session
THEN 1 ELSE 0
END AS session_start
FROM events
),
sessioned AS (
SELECT
user_id,
event_time,
-- Running sum of session_start flags = monotonically increasing session ID
SUM(session_start) OVER (
PARTITION BY user_id ORDER BY event_time
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS session_id
FROM flagged
)
SELECT
user_id,
session_id,
MIN(event_time) AS session_start,
MAX(event_time) AS session_end,
COUNT(*) AS events_in_session,
MAX(event_time) - MIN(event_time) AS session_duration
FROM sessioned
GROUP BY user_id, session_id;
This pattern is fundamental in product analytics and works in any SQL database that supports window functions. In Spark, it maps directly to window functions on DataFrames.
SCDs handle how dimension tables (customers, products, employees) track changes to attributes over time.
- SCD Type 1 β overwrite. No history retained. Simple but you lose the ability to report "what was the customer's address at time of purchase."
- SCD Type 2 β add a new row for each change with
valid_from,valid_to, andis_currentcolumns. Full history preserved. The most common for data warehouses. - SCD Type 3 β add a column for the previous value (e.g.,
prev_city,curr_city). Limited history β only one prior value. Rarely used. - SCD Type 4 β separate the rapidly-changing attributes into a "history table" while keeping the current value in the dimension table. Combines query performance (current state is fast) with full history.
-- SCD Type 2: dim_customer
CREATE TABLE dim_customer (
customer_key BIGINT PRIMARY KEY, -- surrogate key
customer_id INT, -- natural / business key
name VARCHAR(200),
email VARCHAR(200),
city VARCHAR(100),
valid_from DATE NOT NULL,
valid_to DATE, -- NULL means currently active
is_current BOOLEAN NOT NULL DEFAULT TRUE
);
-- Expire the old row and insert a new one on change
UPDATE dim_customer
SET valid_to = CURRENT_DATE - 1, is_current = FALSE
WHERE customer_id = 42 AND is_current = TRUE;
INSERT INTO dim_customer (customer_id, name, email, city, valid_from, is_current)
VALUES (42, 'Alice', 'alice@new.com', 'Berlin', CURRENT_DATE, TRUE);
-- Historical join: what was the customer's city when the order was placed?
SELECT o.order_id, c.city
FROM orders o
JOIN dim_customer c
ON o.customer_id = c.customer_id
AND o.order_date BETWEEN c.valid_from AND COALESCE(c.valid_to, '9999-12-31');
EXISTS and IN for subqueries? Which performs better and why?IN (subquery) materialises the entire subquery result into a list, then checks each outer row against that list. If the subquery returns NULLs, the behaviour is surprising β IN with a list containing NULL never returns TRUE for NULL comparisons, causing rows to silently disappear.
EXISTS (subquery) is a semi-join β it short-circuits as soon as one matching row is found. It does not materialise the full result. It handles NULLs correctly because it tests for the existence of a row, not equality.
Performance: Modern query optimisers (PostgreSQL, SQL Server, Oracle, BigQuery) often rewrite both to the same execution plan β the choice is largely semantic. However:
EXISTSwins when the subquery result is large β it short-circuits and never needs the full set.INcan win for small, known-finite lists or when the subquery result is already cached.NOT INwith NULLs is a classic bug:WHERE id NOT IN (SELECT id FROM t)returns zero rows iftcontains any NULLid. Always preferNOT EXISTSor filter NULLs explicitly.
-- Dangerous: returns nothing if orders has any NULL customer_id
SELECT * FROM customers WHERE id NOT IN (SELECT customer_id FROM orders);
-- Safe alternative
SELECT * FROM customers c WHERE NOT EXISTS (
SELECT 1 FROM orders o WHERE o.customer_id = c.id
);
Query Optimisation
8 questionsAn execution plan shows how the database engine intends to run your query β which indexes it uses, the join algorithm, estimated row counts, and cost. Reading plans is the most important debugging skill for slow queries.
-- PostgreSQL: get an actual execution plan with real timings
EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) SELECT ...;
-- ANALYZE runs the query; BUFFERS shows cache hits vs disk reads
Key operators and what they mean:
- Seq Scan β full table scan. Expected for small tables or when a large % of rows is returned. A red flag on a large table with a WHERE clause (missing index).
- Index Scan β uses an index to find rows, then fetches heap pages. Good for selective queries.
- Index Only Scan β all needed columns are in the index; no heap access. The fastest access pattern.
- Bitmap Heap Scan β uses an index to build a bitmap of matching pages, then fetches pages. Efficient for moderate selectivity (5β20% of rows).
- Nested Loop Join β for each row in the outer table, scan/index-lookup the inner. Fast for small outer tables with indexed inner.
- Hash Join β build a hash table from the smaller input, probe with the larger. Good for large unsorted joins. Requires memory (spills to disk if it exceeds
work_mem). - Merge Join β both inputs sorted on the join key; walk them in tandem. Efficient when inputs are already sorted or have matching indexes.
- Sort / Hash Aggregate β look for high-cost sort operations; they often indicate a missing index or an opportunity to use
work_memtuning.
Red flags: row estimate vs actual row count mismatch (stale statistics β run ANALYZE), high cost Seq Scans on large tables, disk spills (Batches > 1 on a Hash Join), unexpected Nested Loops on large outer sets.
- B-tree (default) β balanced tree; supports equality, range,
LIKE 'prefix%', sorting. The right choice for most use cases. - Hash β equality-only lookups. Marginally faster than B-tree for pure equality, but not useful for ranges or sorting. Rarely worth it.
- GiST β generalised search tree; supports geometric, range, and full-text types. Used for PostGIS,
tsrange,intrange. - GIN β generalised inverted index; efficient for multi-valued types (arrays, JSONB, full-text). Use for
jsonb @>, array containment, andtsvectorfull-text search. - BRIN β block range index; stores min/max values per block range. Tiny size; works for naturally ordered data (timestamps in an append-only log table). Not suitable for random writes.
Partial index β an index with a WHERE clause. Only indexes a subset of rows. Dramatically smaller and faster for skewed queries.
-- Index only active (unprocessed) orders β 5% of the table
CREATE INDEX idx_orders_active ON orders (created_at)
WHERE status = 'pending';
-- Queries with WHERE status = 'pending' use this tiny, fast index
Composite (multi-column) index β the column order matters. The index is most useful when queries filter or sort on the leftmost prefix. Rule of thumb: put the highest-selectivity column first for equality predicates; put the sort column last.
-- Supports: WHERE tenant_id = ? AND status = ? ORDER BY created_at
CREATE INDEX idx_orders_tenant_status_date
ON orders (tenant_id, status, created_at DESC);
-- Does NOT efficiently support: WHERE status = ? (no tenant_id prefix)
The N+1 problem occurs when you execute 1 query to fetch N rows, then execute N additional queries to fetch related data for each row β totalling N+1 round trips to the database. It is one of the most common performance anti-patterns in ORM-based applications.
-- N+1 in Python (SQLAlchemy):
orders = session.query(Order).filter_by(status='pending').all() # 1 query
for order in orders:
print(order.customer.name) # 1 query per order β N queries
-- Fix: eager loading (JOIN fetch)
orders = session.query(Order).options(
joinedload(Order.customer) # single JOIN query
).filter_by(status='pending').all()
SQL fix β single JOIN:
-- Bad: app loops and runs N queries
-- Good: one query fetches everything
SELECT o.id, o.total, c.name AS customer_name
FROM orders o
JOIN customers c ON o.customer_id = c.id
WHERE o.status = 'pending';
Detection: Enable query logging (log_min_duration_statement = 0 in PostgreSQL), use APM tools (Datadog, New Relic), or ORM query counters. A sign is hundreds of near-identical queries with different IDs in a single request trace.
Partitioning divides a large table into smaller physical pieces (partitions) that can be queried, maintained, and stored independently. The planner uses partition pruning to skip irrelevant partitions.
- Range partitioning β partitions by a range of values. Ideal for time-series data (one partition per month/year). Old partitions can be detached and archived cheaply. Queries filtered by date access only relevant partitions.
- List partitioning β partitions by a discrete list of values (e.g., region: 'NA', 'EU', 'APAC'). Use when data naturally groups into known categories.
- Hash partitioning β distributes rows by a hash of the partition key across N buckets. Use to evenly distribute load when no natural range or list structure exists (e.g., partition by
user_id % 8).
-- Declarative range partitioning by month (PostgreSQL 10+)
CREATE TABLE events (
id BIGINT,
user_id INT,
event_time TIMESTAMPTZ NOT NULL,
event_type TEXT
) PARTITION BY RANGE (event_time);
CREATE TABLE events_2024_01
PARTITION OF events FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE events_2024_02
PARTITION OF events FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- Query with date filter β only the relevant partition is scanned
EXPLAIN SELECT * FROM events WHERE event_time >= '2024-01-15';
Operational benefits: Drop old data by detaching a partition (instant) instead of running a slow DELETE. Indexes can be built per partition in parallel. Vacuuming is more targeted.
PostgreSQL uses Multi-Version Concurrency Control (MVCC) β updates and deletes don't physically overwrite rows; they create new versions and mark old ones as "dead." VACUUM reclaims the storage occupied by dead tuples and prevents transaction ID wraparound.
Two forms:
VACUUMβ marks dead tuple space as reusable. Does not return space to the OS. Can run concurrently with reads and writes.VACUUM FULLβ rewrites the entire table, returning space to the OS. Requires an exclusive lock β blocks all reads and writes. Use sparingly and during maintenance windows.
Autovacuum runs in the background automatically, but can fall behind on high-write tables. Signs of a bloated table: query plans show unexpected Seq Scans, table size is much larger than the actual data, index scans slow down from dead tuple overhead.
-- Check table bloat and last vacuum/analyse time
SELECT
schemaname, relname,
n_dead_tup,
last_vacuum, last_autovacuum,
last_analyze, last_autoanalyze
FROM pg_stat_user_tables
WHERE n_dead_tup > 100000
ORDER BY n_dead_tup DESC;
-- Manually trigger vacuum and update statistics
VACUUM ANALYZE orders;
Transaction ID wraparound is the most severe consequence of neglected vacuuming: PostgreSQL uses 32-bit XIDs. After ~2 billion transactions, old rows appear "in the future" and the database goes into emergency read-only mode. Autovacuum protects against this β never disable it.
Row storage (heap) β stores all columns of a row contiguously. Optimal for OLTP: a single I/O fetch retrieves a complete row for point lookups and updates.
Columnar storage β stores each column's values contiguously. Analytical queries that aggregate a few columns across many rows (e.g., SUM(revenue) over 100 million rows) only read the relevant columns from disk, skipping irrelevant ones entirely.
Additional advantages of columnar storage:
- Compression β values in the same column are typically homogeneous and often sorted, enabling much higher compression ratios (run-length encoding, delta encoding, dictionary encoding). Parquet files typically compress 5β10Γ vs raw data.
- Vectorised execution β operations run on contiguous arrays of the same type, enabling SIMD CPU instructions. DuckDB, ClickHouse, and BigQuery's Dremel engine are built around this.
- Predicate pushdown with statistics β columnar formats (Parquet, ORC) store min/max values per row group. Queries with range predicates skip entire row groups without reading them.
Trade-offs: Columnar is slow for updates (writing one row touches every column file). It is optimal for write-once, read-many analytical workloads β data warehouses, data lakes, OLAP engines (Redshift, BigQuery, Snowflake, ClickHouse all store data columnar).
Pushdown moves computation as close to the data source as possible β pushing filters, projections, aggregations, and limits down to the storage layer before data travels over the network to the query engine. It is the single most important optimisation for federated and distributed query engines.
Types of pushdown:
- Predicate pushdown β filter rows at the source. Instead of reading 100M rows and filtering in the engine, push
WHERE date = '2024-01'to S3 via Parquet row group statistics β only read matching row groups. - Projection pushdown β read only needed columns. Columnar formats make this free β fetching 3 columns out of 200 reads 1.5% of the data.
- Aggregation pushdown β push
COUNT(*)orSUM()to the connector. Trino can push partial aggregations to remote databases (PostgreSQL, MySQL connectors), reducing data transferred. - Limit pushdown β push
LIMIT 10to the source so it stops scanning early.
-- Trino on Hive/S3 β WITH pushdown:
-- Only reads Parquet files in the 2024/01 partition,
-- only reads the 'revenue' column,
-- stops after finding 100 rows
SELECT revenue FROM hive.sales.orders
WHERE year = 2024 AND month = 1
LIMIT 100;
-- Check what was pushed down
EXPLAIN SELECT revenue FROM hive.sales.orders
WHERE year = 2024 AND month = 1;
Writing queries that enable pushdown is a key skill β avoid functions on partition columns in WHERE clauses (YEAR(event_time) = 2024 prevents partition pruning; use event_time BETWEEN '2024-01-01' AND '2025-01-01' instead).
Data skew occurs when data is unevenly distributed across partitions or reduce tasks. A small number of tasks process a disproportionately large amount of data, becoming bottlenecks (stragglers) that delay the entire job.
Common causes: Joining or grouping on a low-cardinality column with very uneven distribution (e.g., country = 'US' has 70% of all rows). NULL values in join keys (all NULLs hash to the same partition).
Remedies:
- Salting β append a random suffix (
0βN-1) to skewed keys, distribute the aggregation across N tasks, then combine results. - Broadcast join (map-side join) β if one side is small enough to fit in memory, broadcast it to all executors, eliminating the shuffle entirely. Spark:
broadcast(smallDf). - Skew hints β Spark 3.0+ Adaptive Query Execution (AQE) detects skewed partitions at runtime and splits them automatically.
- Repartition β repartition on a higher-cardinality column before the join.
// Spark β explicit broadcast join hint
import org.apache.spark.sql.functions.broadcast
val result = bigDf.join(broadcast(smallDf), "product_id")
// Spark β enable AQE (handles skew automatically in Spark 3+)
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
Data Modelling
6 questions- Star schema (Kimball) β a central fact table surrounded by denormalised dimension tables. Simple, fast to query (few joins), intuitive for BI tools. The most common pattern for data marts and semantic layers. Redundancy in dimensions is accepted in exchange for query simplicity.
- Snowflake schema β normalises dimension tables into sub-dimensions (e.g.,
dim_product β dim_category β dim_department). Reduces storage redundancy but requires more joins, hurting query performance. Harder for analysts to navigate. Rarely worth the complexity over star schema in a modern columnar warehouse. - Data Vault β three entity types: Hubs (business keys), Links (relationships between hubs), Satellites (descriptive attributes with history). Highly flexible to schema changes β adding new data sources doesn't break existing models. Excellent for enterprise data warehouses with many source systems and strict auditability requirements. Complex to query directly; typically a presentation layer (star schema) is built on top of the vault.
When to choose: Star schema for data marts, self-service BI, single-domain warehouses. Snowflake schema almost never (columnar storage makes the storage saving irrelevant). Data Vault for large enterprise EDWs with many heterogeneous sources, regulatory audit requirements, or frequent schema evolution.
Fact tables store measurable, quantitative business events. Each row represents one event and contains foreign keys to dimensions and numeric measures.
- Transactional fact β one row per business event at the lowest grain (each order line, each page view, each payment). Append-only. The most flexible β you can roll up to any period. The most common type.
- Periodic snapshot β one row per period per entity, capturing a summary at regular intervals (daily account balance, weekly inventory level). Useful when events are too granular (millions/day) and you primarily query at a period level. Rows are inserted on schedule, not triggered by events.
- Accumulating snapshot β one row per business process instance (e.g., one row per order lifecycle), with multiple date foreign keys updated as the process advances (
order_date,shipped_date,delivered_date). Rows are updated as milestones are reached. Ideal for pipeline or funnel analysis where you need to measure lag between stages.
The grain (what one row represents) must be defined before building a fact table β adding measures that don't match the grain is the most common modelling mistake.
The medallion architecture (popularised by Databricks) is a layered data lake pattern that progressively cleanses and enriches data through three zones:
- Bronze (raw) β landing zone for raw ingested data, exactly as received from the source. Immutable and append-only. Schema-on-read. Retains all source data including malformed records. Purpose: full audit trail and the ability to reprocess from scratch.
- Silver (cleansed/conformed) β deduplicated, validated, lightly transformed. Source schemas unified to a canonical format. NULL handling applied. Joins between source entities performed here. Business rules not yet applied. Purpose: the "single source of truth" for data consumers who need raw, trustworthy data.
- Gold (curated/business-ready) β aggregated, modelled, business-logic-applied. Star schema fact and dimension tables, wide tables for BI tools, metric definitions applied. Optimised for query performance (pre-aggregated, partitioned). Purpose: powering dashboards, ML feature stores, and self-service analytics.
Benefits: Incremental reprocessing β a bug in Silver-to-Gold logic is fixed and re-run without re-ingesting raw data. Clear data contract between layers. Decoupled access control (raw data in Bronze restricted to engineers; Gold open to analysts).
Normalisation organises data to reduce redundancy by decomposing tables into smaller ones linked by foreign keys. Each fact stored once. Enforced by normal forms (1NF β 2NF β 3NF β BCNF).
Denormalisation intentionally reintroduces redundancy to reduce join complexity and improve read performance. Common in analytics where read throughput matters more than write efficiency.
When to denormalise in a data warehouse:
- Wide flat tables β combine frequently joined tables into one wide table used by BI tools. Eliminates runtime JOINs. Suitable when data is refreshed in batch (stale redundancy is acceptable).
- Pre-aggregated summary tables β materialise expensive aggregations (daily active users, monthly revenue by segment) for fast dashboard queries.
- Embedding arrays/structs β in BigQuery and Snowflake, nested/repeated fields (ARRAY<STRUCT>) denormalise one-to-many relationships into a single row, eliminating JOINs and enabling cheaper clustered scans.
- Dimension denormalisation in star schema β by design. Product category, sub-category, and brand are all in
dim_productrather than normalised into separate tables. Simpler SQL for analysts.
Many-to-many relationships (a customer has many orders, an order can contain many products) are modelled with bridge tables (also called junction or associative tables) in normalised schemas, or using nested/array types in columnar warehouses.
Bridge table approach (star schema):
-- An account can have multiple promotions; a promotion applies to many accounts
dim_account (account_key, account_id, name, ...)
dim_promotion (promo_key, promo_id, name, discount_pct, ...)
bridge_account_promotions (
account_key BIGINT REFERENCES dim_account,
promo_key BIGINT REFERENCES dim_promotion,
effective_date DATE
)
-- Query: revenue per promotion, correctly double-counted via weighting factor
-- Add a weighting_factor = 1/COUNT(promotions per account) to avoid inflating measures
Array-of-keys approach (BigQuery/Snowflake):
-- BigQuery: embed promotion keys as an array in the fact table
SELECT
order_id,
SUM(revenue),
COUNT(DISTINCT promo_key) AS num_promotions_applied
FROM fct_orders,
UNNEST(promotion_keys) AS promo_key -- lateral unnest
GROUP BY order_id;
The array approach avoids a bridge table join, dramatically reducing query cost in columnar engines where unnest is cheap but JOINs require shuffles.
A surrogate key is a system-generated, meaningless integer or UUID assigned as the primary key of a warehouse dimension, independent of any business meaning. A natural key is a real-world identifier from the source system (customer email, product SKU, order ID).
Why surrogate keys are preferred:
- SCD Type 2 support β the same business entity needs multiple rows (current and historical) in the dimension. Surrogate keys make each row uniquely addressable. Natural keys can't do this.
- Source system independence β business keys change (company rebrands, email changes, systems migrate). Surrogate keys insulate the warehouse from upstream changes.
- Consistent join performance β integer surrogate keys are small, sort efficiently, and produce fast joins. String natural keys (especially emails, UUIDs) are wider and slower.
- Multi-source integration β two source systems may have overlapping customer_id ranges (both have a customer 1001). Surrogate keys provide a unique namespace across sources.
Always retain the natural key as a regular column β it's needed to reload data from sources, join back to operational systems, and for debugging. The surrogate key is for warehouse-internal use.
Apache Spark
10 questionsSpark builds a DAG (Directed Acyclic Graph) of transformations lazily β no computation happens until an action is called (collect(), count(), write()). The DAG Scheduler then converts the logical plan into physical stages.
- Transformations β lazy operations that define the computation graph. Narrow (each input partition β one output partition;
map,filter,select) vs wide (multiple input partitions β one output;groupBy,join,distinct). Wide transformations require a shuffle. - Stage boundary β every shuffle creates a stage boundary. Tasks in one stage can run in parallel; stages run sequentially (next stage waits for all tasks of the previous to complete and write shuffle files).
- Tasks β the unit of parallelism. One task per partition. Tasks run on executor cores.
// Narrow transforms β no shuffle, one stage
val filtered = df
.filter(col("status") === "active") // narrow
.select("user_id", "revenue") // narrow
// Wide transforms β shuffle β new stage boundary
val result = filtered
.groupBy("user_id") // wide β shuffle
.agg(sum("revenue").as("total"))
.filter(col("total") > 1000) // narrow (operates on shuffle output)
.orderBy(col("total").desc) // wide β another shuffle
result.write.parquet("s3://bucket/output") // ACTION: triggers execution
Understanding stages explains where Spark spends its time. spark.ui shows the DAG visualisation, stage durations, and task-level metrics β always profile here before optimising blindly.
A shuffle redistributes data across executors so that rows with the same key are co-located for operations like groupBy, join, or distinct. It involves: serialising partition data to disk, transferring it over the network to the correct executor, and deserialising.
Why it's expensive: Disk I/O (write shuffle files), network I/O (all-to-all data transfer), deserialisation CPU, and potential disk spill if memory is insufficient. A large shuffle is the single most common cause of slow Spark jobs.
Minimisation strategies:
- Filter early β apply all filters before any join or groupBy to reduce shuffle data volume.
- Broadcast join β join a large table with a small one (<10 MB by default) by broadcasting the small table to all executors β no shuffle needed for the large table.
broadcast(smallDf). - Reduce the number of shuffles β chain
groupByoperations together rather than separately. Userepartition()strategically to pre-sort data for downstream joins. - Partition by join key β write data to Parquet partitioned by the join key; subsequent reads for that join avoid a shuffle (Spark detects matching partitioning).
- Tune
spark.sql.shuffle.partitionsβ default 200 is too many for small jobs (tiny tasks) and too few for large ones (OOM). Set it to 2β4Γ the number of available cores, adjusted for data volume. - AQE (Adaptive Query Execution) β enables Spark 3 to merge small shuffle partitions at runtime, reducing overhead from the fixed partition count.
repartition() and coalesce()? When do you use each?repartition(N)β performs a full shuffle to redistribute data across N partitions. Produces evenly-sized partitions. Can be used to increase or decrease partition count. Use before a join on a specific key (repartition(200, "customer_id")) to pre-shuffle once and avoid repeated shuffles on the same key.coalesce(N)β merges partitions locally without a shuffle by combining adjacent partitions on the same executor. Can only reduce partition count. Produces potentially skewed partitions (uneven sizes). Much cheaper thanrepartitionfor reducing partitions before writing output.
// After a large filter that left 200 partitions mostly empty
// Use coalesce to reduce before writing β no shuffle cost
df.filter(col("revenue") > 10000)
.coalesce(20) // cheap β no shuffle
.write.parquet("s3://output/")
// Before a join on customer_id β pre-shuffle both sides on the join key
// so the join itself avoids a second shuffle
largeDF.repartition(200, col("customer_id")) // full shuffle β worth it
.join(otherDF.repartition(200, col("customer_id")), "customer_id")
Rule: use coalesce to reduce partition count before a write (no shuffle). Use repartition when you need a full, even redistribution or want to pre-partition on a key for downstream operations.
Spark's memory model on each executor is divided into:
- Reserved memory β ~300 MB fixed for Spark internals.
- User memory β for user-defined data structures, UDFs, RDD metadata. Fixed fraction (
spark.memory.fractiondefaults to 0.6, leaving 40% for user memory). - Unified memory pool (60% of usable) β shared dynamically between:
- Storage memory β cached RDDs/DataFrames (
df.cache()). Spills to disk when evicted. - Execution memory β shuffle buffers, sort buffers, hash join build tables. Spills to disk when insufficient.
- Storage memory β cached RDDs/DataFrames (
The unified model allows execution to borrow from storage (if nothing is cached) and vice versa, improving utilisation over the old static split.
Off-heap memory (spark.memory.offHeap.enabled=true) β allocates memory outside the JVM heap. Avoids GC pressure for large datasets. Useful for long-running jobs with large caches where GC pauses become a problem. Requires careful tuning (spark.memory.offHeap.size).
// Diagnose OOM / spill issues
// Check Spark UI β Stages β Shuffle Read/Write bytes, Spill (memory), Spill (disk)
// Increase executor memory:
--executor-memory 8g --executor-cores 4
// Or tune the memory fraction
spark.conf.set("spark.memory.fraction", "0.7")
spark.conf.set("spark.memory.storageFraction", "0.3")
Delta Lake is an open-source storage layer (by Databricks, now Linux Foundation) that adds a transaction log (_delta_log/) on top of Parquet files in a data lake. The transaction log is the source of truth for what files are part of the table at any point in time.
Key capabilities:
- ACID transactions β each write operation is an atomic commit to the transaction log. Readers always see a consistent snapshot, even while a write is in progress (MVCC-style snapshot isolation).
- DML operations β
UPDATE,DELETE,MERGE(upsert) on Parquet, which is otherwise immutable. Delta rewrites affected files and records the change in the transaction log. - Schema enforcement β by default, rejects writes that don't match the table schema, preventing silent corruption.
- Schema evolution β with
mergeSchema=true, new columns can be added automatically. - Time travel β query any previous version of the table using
VERSION AS OForTIMESTAMP AS OF. Enables auditing, debugging, and rollback.
// MERGE (upsert) β the most common CDC pattern
deltaTable.as("target").merge(
updates.as("source"),
"target.id = source.id"
).whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
// Time travel
spark.read.format("delta")
.option("versionAsOf", 10)
.load("s3://bucket/table") // read version 10
// Compact small files and clean old versions
spark.sql("OPTIMIZE delta.`s3://bucket/table` ZORDER BY (customer_id)")
spark.sql("VACUUM delta.`s3://bucket/table` RETAIN 168 HOURS")
Python UDFs are the worst-performing option: each row is serialised from the JVM, sent to a Python worker process via a socket, executed in Python, and the result serialised back. This involves two serialisation steps and IPC overhead per row β 10β100Γ slower than native Spark operations.
Scala/Java UDFs run in the JVM alongside Spark, avoiding IPC. However they still break the Catalyst optimiser's ability to push down predicates or optimise across the UDF boundary β the UDF is a black box.
Alternatives (always prefer in this order):
- Built-in Spark SQL functions (
pyspark.sql.functions) β fully optimised by Catalyst, run as JVM bytecode, vectorised on Tungsten. Cover the vast majority of transformations. - Pandas UDFs (Vectorised UDFs) β use PyArrow to transfer entire columns as Pandas Series (batch) instead of row-by-row. 10β100Γ faster than row-level Python UDFs. Use when you need Python library logic (NumPy, scikit-learn) on grouped data.
- Spark SQL expression β express complex logic as a SQL expression string; Catalyst can optimise it.
# Slow: Python UDF β row-by-row, no optimisation
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
@udf(returnType=StringType())
def slow_clean(s):
return s.strip().lower() if s else None
# Fast: built-in function β Catalyst-optimised, Tungsten-executed
from pyspark.sql.functions import trim, lower
df.withColumn("cleaned", lower(trim(col("name"))))
Structured Streaming models a real-time data stream as an unbounded DataFrame, applying the same DataFrame/SQL API as batch. Internally it runs micro-batch processing (default) or continuous processing.
Exactly-once semantics require three pieces:
- Replayable sources β Kafka (replay from offset), cloud storage (idempotent reads). Not all sources support this.
- Idempotent sinks β the sink can handle duplicate writes without changing the result (Delta Lake MERGE, idempotent file writes with unique filenames).
- Checkpointing β Spark writes offsets and micro-batch metadata to a checkpoint location (HDFS/S3) before committing to the sink. On restart, it recovers from the last committed offset.
from pyspark.sql.functions import from_json, col
# Read from Kafka β Spark tracks offsets in the checkpoint
stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "orders") \
.load()
parsed = stream.select(from_json(col("value").cast("string"), schema).alias("data")) \
.select("data.*")
# Write to Delta Lake β idempotent sink + checkpointing = exactly-once
parsed.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "s3://bucket/checkpoints/orders") \
.trigger(processingTime="30 seconds") \
.start("s3://bucket/delta/orders")
Each Spark task writes one output file per output partition. Streaming jobs, frequent incremental loads, or highly partitioned tables accumulate thousands of small files over time (kilobytes each vs optimal 128β512 MB).
Why it matters: Each file requires a filesystem metadata call (LIST, GET) and a task launch. Millions of small files cause slow job startup (minutes just for file listing on S3), high NameNode/metadata pressure, and poor Parquet statistics (each file's footer must be read). Query performance degrades roughly linearly with file count.
Solutions:
- Delta Lake OPTIMIZE β rewrites small files into target-sized files (default 1 GB). Run periodically (nightly, or triggered by file count thresholds).
ZORDER BYduring OPTIMIZE physically co-locates related data. - Apache Iceberg compaction β
rewrite_data_filesaction. Similar to Delta OPTIMIZE. - Hudi clustering β reorganises data into optimally-sized files with Z-ordering.
- Batch writes instead of per-event writes β accumulate records in Kafka, write micro-batches (every 5 minutes) rather than one file per event.
- Coalesce before write β
df.coalesce(target_file_count).write.parquet(...)for batch jobs where you know the output size.
Z-ordering is a data-skipping optimisation that co-locates related data within Parquet files using a space-filling curve (Z-curve / Morton code). Unlike one-dimensional sorting (which optimises filtering on one column), Z-ordering interleaves multiple columns β preserving locality in both dimensions simultaneously.
How it improves performance: Delta Lake stores min/max statistics per column per file. When a query filters on a Z-ordered column, Delta can skip files whose min/max range doesn't include the filter value. Z-ordering maximises the number of files that can be skipped for multi-column filters.
-- Optimise and Z-order a table on the most frequently filtered columns
-- Common access pattern: WHERE customer_id = X AND event_date = Y
OPTIMIZE delta.`s3://bucket/events`
ZORDER BY (customer_id, event_date);
-- Delta tracks statistics; queries now skip most files:
SELECT * FROM events WHERE customer_id = 12345 AND event_date = '2024-03-15';
-- Without ZOrder: reads all 10,000 files
-- With ZOrder: skips to ~50 files where min(customer_id) β€ 12345 β€ max(customer_id)
Limitations: Z-ordering is re-applied at compaction time (OPTIMIZE); newly written data is not automatically Z-ordered until the next OPTIMIZE run. Effective for 2β4 columns β beyond that, the locality benefit diminishes. For one column, simple sort ordering is as effective and cheaper to maintain.
A structured methodology prevents random tuning. Follow the hierarchy: measure β identify bottleneck β fix β verify.
- Open the Spark UI β check the DAG, stage timeline. Find the slowest stage (highest duration, most data shuffled).
- Check for spill β "Spill (memory)" and "Spill (disk)" in the stage detail. Spill means memory is insufficient. Fix: increase executor memory, reduce partition size, or increase partition count to reduce per-task data.
- Check for skew β in the Tasks tab, look at the distribution of task durations and data sizes. If one task takes 10Γ longer than the median, it's skew. Fix: AQE skew handling, salting, or broadcast join.
- Check shuffle size β large shuffle bytes (GBs per stage) is the most common bottleneck. Fix: filter earlier, broadcast small tables, partition by join key.
- Check partition count β 200 tasks reading 1 KB each (too many, overhead dominates) vs 10 tasks reading 50 GB each (too few, OOM and spill). Target 100β500 MB per task. Adjust
spark.sql.shuffle.partitions. - Check GC time β in Executor metrics. High GC time indicates JVM heap pressure. Fix: increase executor memory, use off-heap, reduce caching, or use Kryo serialiser.
- Check input data reading β if file listing takes minutes, you have a small files problem. Fix: compact with OPTIMIZE.
// Quick wins before profiling
spark.conf.set("spark.sql.adaptive.enabled", "true") // AQE
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") // merge small partitions
spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
Kafka & Streaming
8 questions- Topic β a named, ordered, immutable log of messages. Producers append; consumers read at any offset.
- Partition β a topic is divided into N ordered, immutable partitions. Partitions are the unit of parallelism β each can be produced to and consumed from independently. Messages within a partition are strictly ordered; across partitions, ordering is not guaranteed.
- Broker β a Kafka server. Stores partition replicas, handles produce/consume requests. A broker is the leader for some partitions and a follower (replica) for others.
- Replication β each partition has a configurable replication factor (typically 3). One broker is the leader; others are ISR (In-Sync Replicas). Producers write to the leader; consumers can read from leaders or followers (Kafka 2.4+ fetch-from-follower for geo-local reads).
- ZooKeeper (legacy) β managed broker metadata, controller election, and topic configs. Operational pain for operators.
- KRaft mode (Kafka 3.3+ GA) β Kafka implements its own Raft-based metadata quorum, eliminating ZooKeeper dependency. Simpler operations, faster controller failover, supports millions of partitions per cluster.
Consumer groups β a set of consumers sharing the same group.id. Partitions are assigned across group members. Kafka tracks per-group offsets. This enables multiple independent consumers of the same topic (each group has its own offset cursor) and parallel consumption (N consumers = N partitions consumed simultaneously).
- At-most-once β commit offset before processing. If the consumer crashes after committing but before processing, the message is lost. Use for metrics where occasional loss is acceptable.
- At-least-once β commit offset after processing. If the consumer crashes after processing but before committing, the message is reprocessed on restart. Duplicates are possible. Most common default. Requires idempotent downstream processing to be safe.
- Exactly-once (EOS) β messages are processed exactly once end-to-end, even on failures and retries. Requires:
- Idempotent producer (
enable.idempotence=true) β broker assigns a sequence number per producer session; duplicate retries are deduplicated. - Transactional API β producer begins a transaction, produces to one or more topics, and atomically commits offsets with the produced messages. The consume-process-produce cycle is atomic.
- Idempotent producer (
// Kafka Streams EOS configuration
Properties props = new Properties();
props.put("processing.guarantee", "exactly_once_v2");
// exactly_once_v2 is the improved implementation (Kafka 2.5+) β
// uses per-partition transactions instead of per-consumer-group,
// reducing the number of open transactions and improving throughput
Practical reality: True end-to-end exactly-once also requires the sink to be idempotent (a database UPSERT keyed on message offset, or a transactional database). EOS in Kafka handles the Kafka-to-Kafka path; the full pipeline must be designed with idempotency at every step.
Partition selection for a message: If a key is provided, the partition is determined by hash(key) % numPartitions. Keyed messages always go to the same partition (ordering per key guaranteed). If no key, messages are round-robined across partitions (sticky partitioner by default, batching to one partition until the batch is full).
Choosing partition count: Partitions are the unit of parallelism β more partitions = higher throughput and more consumers possible. But each partition has cost: broker memory, file handles, replication traffic, and controller overhead.
Practical formula: partitions = max(target_throughput / producer_throughput_per_partition, target_throughput / consumer_throughput_per_partition). A rough starting point: 10β20 partitions per topic for most use cases; scale up for high-throughput topics (>100 MB/s).
Gotcha: Partition count can only be increased, never decreased. Increasing partitions breaks the key-to-partition mapping β already-produced messages stay in their original partition while new messages with the same key go to a different partition. This can temporarily break per-key ordering. Plan partition counts conservatively upfront for keyed topics.
Consumer lag is the difference between the latest offset produced to a partition and the consumer group's committed offset β the number of messages the consumer is behind. Lag indicates that the consumer is processing more slowly than producers are writing, or that it has fallen behind due to a failure.
# Check lag for a consumer group
kafka-consumer-groups.sh \
--bootstrap-server broker:9092 \
--describe \
--group my-consumer-group
# Output: partition, current-offset, log-end-offset, lag, consumer-id, host
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# orders 0 104920 105000 80
# orders 1 200010 205000 4990 β high lag
Monitoring: Export records-lag-max JMX metric to Prometheus (via JMX Exporter). Alert on lag exceeding a threshold relative to expected processing time (e.g., lag > 10 minutes of messages).
Managing lag:
- Scale up consumer group size (add consumer instances β one per partition max).
- Optimise the consumer's processing logic β profile and reduce per-message processing time.
- Batch the downstream sink writes instead of per-message commits.
- Use KEDA to auto-scale consumers based on lag (
lagThresholdtrigger). - Increase partition count to allow more parallel consumers (requires coordinated key handling).
Kafka Streams β a Java library (not a separate cluster) for building stateful stream processing applications. Each instance is a JVM process. State is stored in local RocksDB (backed by changelog Kafka topics for fault tolerance). Ideal for microservice-style stream processing where you want no separate infrastructure. Limited to Kafka as source and sink.
Apache Flink β a distributed stream processing framework with its own cluster (TaskManagers, JobManager). Supports any source/sink (Kafka, Kinesis, JDBC, files). True event-time processing with watermarks. Highly sophisticated state management (managed memory, incremental checkpoints). The gold standard for complex, low-latency, high-throughput streaming.
Comparison:
- Deployment complexity: Kafka Streams is just a library β no cluster. Flink requires deploying and operating a cluster (or using managed service: Confluent Cloud, Amazon Managed Flink).
- Latency: Both achieve millisecond latency. Flink has lower latency for complex stateful operations.
- State size: Kafka Streams uses local disk (RocksDB) β scales to hundreds of GB of state per node. Flink manages state more flexibly with tiered storage.
- SQL support: Flink SQL is best-in-class for streaming SQL. Kafka Streams has no native SQL layer (use ksqlDB instead).
- When to use Kafka Streams: Kafka-to-Kafka transformations within microservices, moderate state requirements, teams wanting minimal ops overhead. When to use Flink: Complex CEP (complex event processing), multi-source joins, large state, SQL-first streaming, production-grade at scale.
- Processing time β when the event is processed by the system. Simple to implement, but meaningless for analytics β a network delay can make a 2PM event appear at 3PM.
- Event time β when the event actually occurred, embedded in the event payload. The correct time for analytics. But events can arrive out of order or late (mobile app buffering data offline, then sending).
Watermarks are a mechanism to estimate how far the event time has progressed. A watermark of T means "we believe all events with event time β€ T have arrived." It is computed as: watermark = max(event_time_seen) - allowed_lateness.
When a window's watermark passes the window end time, the window is triggered (results are emitted). Events that arrive after the watermark passes are considered "late data" and can be handled by: dropping them, emitting updated results (retractions), or routing them to a side output for later reconciliation.
// Flink: event time window with 10-minute allowed lateness
DataStream[Order] stream = ...
stream
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofMinutes(5)) // allow 5 min lateness
.withTimestampAssigner((order, _) => order.eventTime)
)
.keyBy(_.customerId)
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.allowedLateness(Time.minutes(10)) // re-emit window if late data arrives
.sideOutputLateData(lateOutputTag) // capture very late data separately
.sum("amount")
CDC (Change Data Capture) captures row-level changes (INSERT, UPDATE, DELETE) from a database's transaction log and streams them to downstream systems. It enables real-time replication, event-driven architectures, and keeping a data warehouse in sync with an operational database without full table scans.
PostgreSQL CDC via logical replication:
- Enable logical replication:
wal_level = logicalinpostgresql.conf. - Debezium β an open-source CDC platform. The Debezium PostgreSQL connector reads from the WAL (Write-Ahead Log) via the pgoutput plugin, converts each change to a structured Kafka event.
- Events land in Kafka topic
dbserver.public.orders(schema:before,after,opwhere op β {c,u,d}). - A Spark Structured Streaming or Flink job reads from Kafka, applies
MERGEinto a Delta Lake / Iceberg table (upsert on primary key).
-- Enable logical replication on PostgreSQL
ALTER SYSTEM SET wal_level = logical;
-- Create a replication slot for Debezium
SELECT pg_create_logical_replication_slot('debezium', 'pgoutput');
-- Debezium event shape (Kafka value JSON):
{
"op": "u", -- c=create, u=update, d=delete, r=snapshot read
"before": { "id": 1, "status": "pending" },
"after": { "id": 1, "status": "shipped" },
"ts_ms": 1700000000000
}
Alternatives: AWS DMS (managed, multi-source), Airbyte (open-source, UI-driven), Fivetran (SaaS). Debezium is the de facto standard for open-source CDC on PostgreSQL, MySQL, MongoDB, and SQL Server.
Kafka messages are bytes β there is no built-in schema enforcement. Without a schema registry, a producer can change its payload structure (rename a field, change a type) and silently break all consumers.
Confluent Schema Registry (or AWS Glue Schema Registry, Apicurio) stores Avro/Protobuf/JSON Schema definitions versioned by subject (typically one per topic). Producers register schemas and embed a schema ID (4 bytes) in each message. Consumers look up the schema by ID and deserialise correctly.
Compatibility modes prevent breaking changes:
- BACKWARD β new schema can read data written with the previous schema. New consumers can process old messages. Safe for adding optional fields with defaults.
- FORWARD β old schema can read data written with the new schema. Old consumers can process new messages. Safe for removing optional fields.
- FULL β both backward and forward compatible. The safest; most restrictive.
# Register a schema
curl -X POST http://registry:8081/subjects/orders-value/versions \
-H 'Content-Type: application/vnd.schemaregistry.v1+json' \
-d '{"schema": "{\"type\":\"record\",\"name\":\"Order\",...}"}'
# Check compatibility of a new schema before evolving
curl -X POST http://registry:8081/compatibility/subjects/orders-value/versions/latest \
-d '{"schema": "NEW_SCHEMA_JSON"}'
Schema registry is non-negotiable in production data pipelines β schema drift is one of the most common causes of silent data corruption in pipelines.
dbt & Transformation
7 questionsdbt (data build tool) is a transformation framework that allows data analysts and engineers to transform raw data in a warehouse using SQL. It applies software engineering best practices (version control, testing, modular design, documentation) to SQL transformations.
The problem it solves: Before dbt, warehouse transformations were a mess of ad-hoc SQL scripts, stored procedures, and ETL jobs with no testing, no lineage visibility, and no documentation. Analysts wrote SQL in Looker or Tableau without version control. Data pipelines were fragile and opaque.
dbt's key contributions:
- Models β each SQL file is a model (SELECT statement). dbt compiles it into a CREATE TABLE AS or CREATE VIEW AS. Models reference other models via
{{ ref('model_name') }}β dbt builds the DAG and runs in dependency order. - Tests β declarative data quality tests (
not_null,unique,accepted_values,relationships) and custom SQL tests. Run after every transformation. - Documentation β auto-generated data catalog from model descriptions. Lineage graph shows how every table was built.
- Macros β Jinja templating for reusable SQL logic.
- Seeds β load small CSV files into the warehouse as reference tables.
- view β creates a database view. No data is stored; the query is re-run on every downstream reference. Use for lightweight staging models that transform raw source data.
- table β creates a physical table by fully re-running the query on every
dbt run. Simple and always fresh. Use for models that are fast to rebuild or are infrequently run. - incremental β on the first run, builds the full table. On subsequent runs, only processes new or changed rows and merges/appends them. Drastically reduces runtime for large tables (process today's events, not all history). Requires a way to identify new rows (a timestamp or event ID).
- ephemeral β not materialised in the database at all; compiled as a CTE and inlined wherever it's referenced. Use for intermediate logic you want to modularise without creating a DB object.
-- models/fct_events.sql
{{ config(
materialized='incremental',
unique_key='event_id',
incremental_strategy='merge' -- or 'delete+insert', 'append'
) }}
SELECT event_id, user_id, event_type, created_at
FROM {{ ref('stg_events') }}
{% if is_incremental() %}
-- Only process rows newer than the max already in the table
WHERE created_at > (SELECT MAX(created_at) FROM {{ this }})
{% endif %}
Incremental gotchas: late-arriving data can be missed if you filter only on the max timestamp. Use a lookback window (created_at > MAX(created_at) - INTERVAL '3 days') to catch late arrivals. Full-refresh when the model logic changes (dbt run --full-refresh).
A well-structured dbt project follows a layered convention that mirrors the medallion architecture:
models/
βββ staging/ # stg_* : 1:1 with source tables, light cleaning only
β βββ sources.yml # define source tables + freshness SLAs
β βββ stg_orders.sql
β βββ stg_customers.sql
βββ intermediate/ # int_* : joins, enrichment, not exposed to BI
β βββ int_order_items_enriched.sql
βββ marts/ # fct_*, dim_* : business-facing, star schema
β βββ core/
β β βββ fct_orders.sql
β β βββ dim_customers.sql
β βββ finance/
β βββ fct_revenue.sql
βββ utilities/ # shared macros, date spines
Layer rules:
- Staging β one model per source table. Only: rename columns to standard conventions, cast types, dedup, basic NULL handling. No joins, no business logic. Ephemeral or views.
- Intermediate β joins between staging models, complex logic. Not exposed to BI tools. Ephemeral or views.
- Marts β final fact and dimension tables consumed by BI. Table or incremental. Named by business domain. The contract with data consumers.
Use dbt-project.yml to enforce materialisation per folder: staging β view, marts β table. This prevents accidentally materialising intermediates as tables.
Generic tests β reusable test templates applied via YAML config to many columns across many models. dbt ships four built-in: not_null, unique, accepted_values, relationships (referential integrity). Custom generic tests can be defined as macros.
# models/marts/schema.yml
models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: status
tests:
- accepted_values:
values: ['pending', 'shipped', 'delivered', 'cancelled']
- name: customer_id
tests:
- relationships:
to: ref('dim_customers')
field: customer_id
- name: order_total
tests:
- dbt_utils.expression_is_true:
expression: ">= 0" # dbt-utils package
Singular tests β one-off SQL assertions saved as .sql files in the tests/ directory. Return rows that fail the assertion. Use for complex business logic that doesn't fit generic test patterns.
-- tests/assert_no_revenue_before_activation.sql
-- Fails if any revenue exists before the customer activation date
SELECT o.order_id
FROM {{ ref('fct_orders') }} o
JOIN {{ ref('dim_customers') }} c ON o.customer_id = c.customer_id
WHERE o.order_date < c.activation_date
dbt-expectations (Great Expectations-inspired package) adds 50+ generic tests for ranges, regex matching, row counts, statistical distributions.
dbt models are Jinja2 templates compiled to SQL. Macros are Jinja functions defined in the macros/ directory β reusable SQL logic you can call from any model.
-- macros/cents_to_dollars.sql
{% macro cents_to_dollars(column_name, scale=2) %}
ROUND({{ column_name }} / 100.0, {{ scale }})
{% endmacro %}
-- macros/generate_surrogate_key.sql
-- Reusable surrogate key from multiple columns
{% macro generate_surrogate_key(field_list) %}
{{ dbt_utils.generate_surrogate_key(field_list) }}
{% endmacro %}
-- macros/unpivot_metrics.sql β generates UNION ALL from a list
{% macro unpivot_metrics(relation, metrics, id_col) %}
{% for metric in metrics %}
SELECT {{ id_col }}, '{{ metric }}' AS metric_name, {{ metric }} AS value
FROM {{ relation }}
{% if not loop.last %} UNION ALL {% endif %}
{% endfor %}
{% endmacro %}
-- Using a macro in a model
SELECT
order_id,
{{ cents_to_dollars('amount_cents') }} AS amount_dollars,
{{ generate_surrogate_key(['order_id', 'customer_id']) }} AS order_key
FROM {{ ref('stg_orders') }}
The dbt_utils package provides production-ready macros: generate_surrogate_key, date_spine (generate a row per date), pivot, unpivot, union_relations (union all tables matching a prefix). Install via packages.yml.
Source freshness checks query a loaded_at_field (a timestamp column on the source table) and compare it to the current time. If the most recent record is older than the configured threshold, the check fails β alerting you that the upstream pipeline has stopped delivering data.
# models/staging/sources.yml
sources:
- name: raw
database: prod
schema: raw_data
freshness:
warn_after: { count: 2, period: hour } # warn if >2 hours stale
error_after: { count: 12, period: hour } # error if >12 hours stale
tables:
- name: orders
loaded_at_field: _etl_loaded_at # timestamp of last load
freshness:
warn_after: { count: 1, period: hour }
error_after: { count: 4, period: hour }
- name: users
loaded_at_field: updated_at
# Run freshness checks separately from dbt run
dbt source freshness
# Output:
# Found 2 sources
# orders: WARN (last loaded 3 hours ago, threshold: 1 hour)
# users: PASS (last loaded 30 minutes ago)
Integrate into CI/CD: run dbt source freshness before dbt run and fail the pipeline if sources are stale. Route failures to PagerDuty or Slack via dbt Cloud alerts or a CI notification step.
Schema drift occurs when a source table's structure changes unexpectedly β a column is renamed, a type changes, or columns are added/removed. Without handling, this silently corrupts downstream models.
Prevention:
- Explicit column selection β never use
SELECT *in production models (except in staging where you explicitly document expected columns). Explicit selects fail loudly when a column disappears. - Source schema tests β use
not_nulland type-checking tests on source models so drift is caught during CI before it reaches marts. - Contract enforcement (dbt 1.5+) β declare
contract: enforced: trueon a model to require that the model's output exactly matches the declared column schema. dbt will fail if the actual query result differs.
# models/marts/fct_orders.yml
models:
- name: fct_orders
config:
contract:
enforced: true # fails if actual schema differs from declared
columns:
- name: order_id
data_type: bigint
- name: order_total
data_type: numeric
- name: created_at
data_type: timestamp with time zone
Handling additions gracefully: New columns in a source are fine β explicit selection ignores them. They become available when you deliberately add them to the staging model. A column removal causes an immediate failure, which is the desired behaviour.
Pipeline Design
7 questionsETL (Extract, Transform, Load) β extract data from sources, transform it in an external processing engine (Informatica, SSIS, custom Spark jobs), load the transformed result into the warehouse. The warehouse receives only pre-processed data.
ELT (Extract, Load, Transform) β extract raw data and load it directly into the warehouse (Bronze/raw layer), then transform it inside the warehouse using SQL (dbt). The warehouse is the transformation engine.
Why ELT won:
- Compute power β modern cloud warehouses (Snowflake, BigQuery, Redshift) are massively parallel. Transforming 100 GB in SQL is fast and cheap. No need for external Spark clusters just for transformation.
- Raw data preserved β loading raw data first means you can reprocess with new logic without re-extracting from the source. ETL loses the raw signal permanently.
- Tooling β dbt made SQL transformations first-class with testing, lineage, and version control. The ecosystem matured around ELT.
- Analyst-friendly β analysts who know SQL can own transformation logic without needing Java/Scala/Python skills required for ETL frameworks.
ETL still has its place: sensitive data requiring masking before entering the warehouse (PII), very large pre-aggregations cheaper to compute outside the warehouse, or sources requiring complex parsing (binary formats, APIs).
An idempotent pipeline produces the same result whether it is run once or multiple times with the same input. Re-running a failed pipeline job should not create duplicate data or corrupt results.
Why it's critical: Pipelines fail. A network timeout, OOM error, or deployment causes a partial run. If re-running the job doubles the data, you have a correctness problem that's harder to debug than the original failure.
Design patterns for idempotency:
- Overwrite, don't append β instead of appending rows, overwrite the output partition for the processing window.
INSERT OVERWRITE PARTITION (date='2024-03-15')is idempotent;INSERT INTOis not. - MERGE / upsert β use primary key-based MERGE operations in Delta Lake / Snowflake / BigQuery. Re-running a CDC merge produces the same result.
- Deterministic transformations β avoid
CURRENT_TIMESTAMPin transformation logic. Use the event's own timestamp, not processing time. - Deduplication on write β use a surrogate key or hash of the row content; deduplicate before writing to prevent exact duplicates even if the source emits twice.
-- Idempotent daily partition overwrite (SparkSQL / Hive)
INSERT OVERWRITE TABLE events PARTITION (event_date = '{{ ds }}')
SELECT * FROM raw_events WHERE DATE(event_time) = '{{ ds }}';
-- Safe to re-run: overwrites the same partition every time
- Full load β extract and reload the entire source table on every run. Simple to implement and always consistent. Acceptable for small tables (<10 million rows). Completely impractical for large tables β creates unnecessary load on source systems and is slow.
- Incremental (high-watermark) β track the max value of a timestamp or auto-increment ID from the last run; extract only rows newer than that watermark. Much faster and cheaper. Limitation: misses hard deletes (deleted rows are never extracted) and updates to rows older than the watermark.
- CDC (Change Data Capture) β reads the database transaction log (WAL); captures all INSERTs, UPDATEs, and DELETEs. The most complete solution. Requires log-level access to the source database. Debezium (PostgreSQL, MySQL), AWS DMS, Oracle GoldenGate. Handles soft and hard deletes.
Choosing:
- Immutable event tables (logs, transactions): incremental by timestamp β no deletes possible.
- Mutable entity tables (customers, products): CDC for real-time, or full-load if the table is small.
- Reporting tables with no deletes, updated rarely: incremental or full-load.
- Compliance/audit requirements where deletions must be tracked: CDC only.
Backfilling is reprocessing historical data β either to fix a bug in transformation logic or to add a new metric/table with historical data. It involves running a pipeline job for past periods, often in parallel.
Designing for safe backfills:
- Parametrise by processing date β every job takes a
logical_dateparameter (or Airflow'sds). Jobs must not useCURRENT_DATEinternally. This enables you to re-run any historical date exactly as if it were processing that day. - Idempotent writes β overwrite partitions by date rather than appending. Re-running date X replaces its output, not duplicates it.
- Separate backfill lane β backfills can overwhelm warehouse/cluster resources. Route backfill runs to a separate Spark cluster or use warehouse concurrency limits to prevent them from blocking production runs.
- Backfill incrementally, not all at once β process 30 days at a time, not 5 years. Monitor quality and cost as you go.
# Airflow backfill command β rerun a DAG for a date range
airflow dags backfill \
--start-date 2024-01-01 \
--end-date 2024-03-31 \
--reset-dagruns \ # clear previous run state
my_pipeline_dag
These tools handle the EL (Extract + Load) part of ELT β connecting source systems to your warehouse without custom engineering.
- Fivetran β fully managed SaaS, 300+ connectors, near-zero configuration. Handles schema migrations automatically. The most reliable for production. Pricing: based on monthly active rows (MAR), which can become expensive at scale.
- Stitch β similar to Fivetran. More affordable, fewer connectors, less polished. Acquired by Talend. Good for smaller teams.
- Airbyte β open-source, self-hostable (or cloud SaaS). 300+ connectors (many community-built). Lower cost for self-hosted. You own the infrastructure and maintenance. Strong Python connector SDK for building custom connectors.
When to use which:
- Data team of 1β5, fast time-to-value, willing to pay: Fivetran. It just works.
- Cost-sensitive, engineering capacity for ops: Airbyte self-hosted on Kubernetes (Helm chart available).
- Custom connector needed for an internal system: Airbyte (custom connector SDK in Python/Java) or a bespoke pipeline in dlt (data load tool).
dlt (data load tool) is worth knowing: an open-source Python library for building lightweight, incremental pipelines with automatic schema inference and 50+ source connectors β good for engineers who prefer code over GUI configuration.
PII handling is a legal requirement (GDPR, CCPA, HIPAA) and an engineering discipline. Senior data engineers must design PII management into pipelines from the start, not bolt it on.
Strategies:
- Tokenisation / pseudonymisation β replace PII with a reversible token stored in a separate, access-controlled mapping table. Analytics use the token; only authorised systems can reverse it. Preferred for analytics use cases (you can still count distinct users).
- Hashing β replace PII with a one-way hash (SHA-256 with a salt). Irreversible. Good for join keys across datasets without exposing raw PII. Consistent hash = same user can still be joined across tables.
- Masking β replace PII with fake but realistic values (e.g.,
alice@example.comβx***@***.com). Good for non-production environments. - Column-level encryption β encrypt PII columns in the warehouse. Query via UDFs that decrypt on the fly for authorised roles (Snowflake dynamic data masking, BigQuery column-level security).
- Right to erasure (GDPR Article 17) β design for deletion. Use a pseudonymisation key store β to erase a user, delete their key. All data pseudonymised with that key is now unreadable. Alternative: store PII separately; deletion of the PII record satisfies GDPR without touching the analytics tables.
-- Snowflake dynamic data masking β analysts see masked email, engineers see real
CREATE MASKING POLICY email_mask AS (val STRING) RETURNS STRING ->
CASE
WHEN CURRENT_ROLE() IN ('ANALYST') THEN REGEXP_REPLACE(val, '.+@', '****@')
ELSE val
END;
ALTER TABLE customers MODIFY COLUMN email SET MASKING POLICY email_mask;
Data lineage tracks the origin and transformation journey of data β from source systems through every transformation to the final dashboard or ML model. It answers: "Where did this number come from?" and "What would break if I change this column?"
Why it's important:
- Impact analysis β before changing a source table column, know every downstream model, dashboard, and ML feature that depends on it.
- Root cause analysis β a metric on a dashboard is wrong. Lineage tells you which model, which source table, which upstream pipeline to investigate.
- Compliance β regulators want to know where customer data flows. GDPR data mapping requirements are essentially lineage.
- Trust β when analysts can see that a metric was built from well-tested, documented models, they trust it.
Implementation:
- dbt β generates column-level lineage automatically from the DAG of
ref()andsource()calls. Viewable in dbt Docs and dbt Cloud. - OpenLineage / Marquez β an open standard for lineage events. Airflow, Spark, dbt, and Flink emit lineage events to a Marquez server, creating a unified cross-tool lineage graph.
- DataHub / Apache Atlas β enterprise data catalogues that ingest lineage from multiple tools and provide search, governance, and impact analysis UI.
Lakehouse & Storage Formats
7 questions- Data warehouse β structured, schema-enforced, optimised for SQL analytics (Snowflake, BigQuery, Redshift). Excellent query performance and governance. Expensive for storing all raw data; limited support for unstructured data and ML workloads.
- Data lake β object storage (S3, GCS, ADLS) holding raw files in any format (CSV, JSON, Parquet, images). Cheap storage, flexible. Poor data management: no ACID, no schema enforcement, no efficient updates β leads to swamps of unreliable data.
- Lakehouse β open table formats (Delta Lake, Apache Iceberg, Apache Hudi) on top of cloud object storage. Adds ACID transactions, schema evolution, DML (UPDATE/DELETE), time travel, and data skipping on top of cheap object storage. SQL and ML workloads on the same platform. Avoids vendor lock-in (open formats).
The three open table formats compared:
- Delta Lake β Databricks-originated, strongest Spark integration, widest adoption. JSON transaction log.
- Apache Iceberg β Netflix-originated, multi-engine (Spark, Trino, Flink, DuckDB). Efficient metadata tree (avoids full table scan for metadata). Best for very large tables with frequent small writes.
- Apache Hudi β Uber-originated, optimised for CDC and streaming upserts. Record-level indexing for fast individual row lookups and updates.
Parquet is an open-source, columnar binary file format designed for efficient analytical queries. Its internal structure:
- Row groups β the file is divided into row groups (typically 128 MB). Each row group contains a set of rows processed together.
- Column chunks β within each row group, data for each column is stored in a column chunk. Allows reading only required columns.
- Pages β each column chunk is divided into pages (typically 8 KB). The smallest unit of compression and encoding.
- Footer metadata β stores schema, row group offsets, and per-column statistics (min, max, null count, distinct count). Used for predicate pushdown and data skipping without reading the data.
Why Parquet dominates:
- Columnar reads β analytical queries typically read 3β5 columns of a 200-column table. Parquet reads only those columns, skipping the rest entirely.
- Compression β columnar homogeneity enables dictionary encoding, run-length encoding, delta encoding. Typical 3β10Γ compression over raw CSV.
- Footer statistics β skip entire row groups for range queries without reading data. A
WHERE date = '2024-01'filter on a date-sorted Parquet file reads only matching row groups. - Ecosystem support β every engine reads Parquet: Spark, Trino, Flink, BigQuery, DuckDB, Pandas, Arrow, Snowflake external tables.
Both provide ACID, schema evolution, time travel, and DML on object storage. The key differences are in metadata management and engine support.
Delta Lake metadata β a JSON transaction log stored alongside the data (_delta_log/). On large tables, listing the transaction log to find relevant files can become slow. The OPTIMIZE + VACUUM cycle manages this.
Iceberg metadata β a tree structure: a metadata JSON file points to a snapshot, which points to manifests, which point to data files. Operations on subsets of the table (partition pruning, file compaction) only touch relevant manifest files. Scales better for tables with millions of partitions and files.
Choose Iceberg when:
- Multi-engine environments β Iceberg has first-class support in Trino, Flink, and AWS Athena/Glue. Delta's non-Databricks support has historically been weaker (though UniForm β Delta reading Iceberg metadata β is improving this).
- Very large tables with many partitions β Iceberg's hierarchical metadata avoids listing millions of files.
- Partition evolution β Iceberg allows changing the partition scheme without rewriting data. Delta requires rewriting the entire table.
- Row-level deletes with delete files β Iceberg supports positional deletes (write a delete file pointing to rows to omit on read) without rewriting data files. Good for frequent, small deletes (GDPR right-to-erasure).
Choose Delta when: You're all-in on Databricks, the ecosystem around Delta (DeltaEngine, Photon, Unity Catalog) is valuable to you, and you primarily use Spark.
DuckDB is an embedded, in-process analytical database (like SQLite, but for OLAP). It runs directly in Python, R, Go, or as a CLI β no server, no setup. It is a vectorised, columnar query engine that reads Parquet, CSV, JSON, and Arrow natively.
Performance: DuckDB can query a 10 GB Parquet file faster on a laptop than most warehouse queries, because it reads the file directly without a network hop and uses vectorised SIMD execution.
Use cases in data engineering:
- Local data exploration β query S3 Parquet files or local data without spinning up a cluster. Replaces ad-hoc Pandas for medium data (up to ~100 GB on a good machine).
- dbt integration β
dbt-duckdbadapter. Develop and test dbt models locally against real data before deploying to Snowflake/BigQuery. No warehouse costs for development. - Lightweight ETL β replace Pandas for file-based transformations. DuckDB processes Parquet 10β50Γ faster than Pandas for aggregations.
- MotherDuck β serverless DuckDB in the cloud. Share DuckDB databases as a managed service.
-- Query S3 Parquet directly from DuckDB (with httpfs extension)
INSTALL httpfs; LOAD httpfs;
SET s3_region='us-east-1';
SELECT
DATE_TRUNC('month', order_date) AS month,
SUM(revenue) AS total_revenue
FROM read_parquet('s3://my-bucket/orders/year=*/month=*/*.parquet')
WHERE order_date >= '2024-01-01'
GROUP BY 1 ORDER BY 1;
Apache Arrow is an in-memory columnar data format and cross-language development platform. It defines a standard memory layout for tabular data β a lingua franca between data processing tools.
The problem it solves: Before Arrow, passing data between Pandas, Spark, DuckDB, and R required serialisation (to CSV, JSON, or Parquet) and deserialisation β expensive and slow. Arrow defines a shared memory layout so data can be passed between processes via shared memory with zero copy.
Impact on the data ecosystem:
- Pandas 2.0 uses Arrow as its backing store, enabling zero-copy data sharing with DuckDB, Polars, and Spark.
- PySpark uses Arrow for Pandas UDF serialisation (the "Vectorised UDF" path) β 10β100Γ faster than row-by-row Python UDFs.
- Flight protocol β Arrow Flight is a gRPC-based protocol for high-throughput data transfer between services using Arrow buffers. Snowflake, BigQuery, and Dremio all support Arrow Flight for bulk data export.
- Polars β a Rust DataFrame library built entirely on Arrow. Faster than Pandas for most operations due to vectorised SIMD execution on Arrow arrays.
Virtual warehouses are Snowflake's compute clusters. They are independent of storage (Snowflake separates storage and compute). Key optimisations:
- Right-size the warehouse for the workload β an XL warehouse is overkill for a simple aggregation that fits in an L. Over-sizing wastes credits, not time, for simple queries.
- Use separate warehouses per workload type (ETL vs BI vs data science) to prevent contention and enable independent auto-suspend policies.
- Enable auto-suspend (2β5 minutes) and auto-resume to avoid paying for idle compute.
Result cache β Snowflake caches query results for 24 hours. Identical queries (same SQL text, same underlying data) served from cache instantly at zero compute cost. Benefit: repeated dashboard refreshes on unchanged data are free.
Micro-partition metadata cache (local disk cache) β virtual warehouse caches Parquet micro-partition data on local SSD. Subsequent queries on the same data read from SSD, not S3. A "warm" warehouse is significantly faster. Avoid unnecessary warehouse suspensions for latency-sensitive dashboards.
Clustering keys β Snowflake micro-partitions are sorted by insertion order. If queries filter on a column that's not correlated with insertion order (e.g., customer_id on an orders table inserted chronologically), Snowflake scans many micro-partitions. Clustering by customer_id re-sorts micro-partitions by that column, enabling pruning. Use clustering for large tables (>1 TB) with highly selective filters on non-insertion-order columns. Clustering has ongoing maintenance cost.
-- Add a clustering key
ALTER TABLE orders CLUSTER BY (customer_id, order_date);
-- Monitor clustering depth (lower = better)
SELECT SYSTEM$CLUSTERING_INFORMATION('orders', '(customer_id, order_date)');
A feature store is a centralised repository for storing, sharing, and serving ML features β computed, versioned, and ready to use for both training and real-time inference.
Problems it solves:
- Training-serving skew β the feature computation logic in the training pipeline (batch Python) differs from the production inference code (real-time Java). Inconsistencies cause model degradation in production. A feature store has one definition, computed once, served everywhere.
- Feature re-use β without a feature store, five teams each compute "user's 30-day purchase count" independently. A feature store shares the computation; teams register and reuse each other's features.
- Point-in-time correctness β training datasets require features as they were at the time of the label, not their current value. Feature stores provide point-in-time joins (time travel for features).
Architecture:
- Offline store β historical feature values in a data warehouse or data lake. Used for training dataset generation.
- Online store β latest feature values in a low-latency key-value store (Redis, DynamoDB, Cassandra). Used for real-time inference (<10ms lookup).
- Materialisation pipeline β batch or streaming job that computes features from raw data and writes to both offline and online stores.
Tools: Feast (open-source), Tecton (managed), Hopsworks (open-source enterprise), Databricks Feature Store, SageMaker Feature Store.
Orchestration
6 questionsAirflow is a platform for programmatically authoring, scheduling, and monitoring workflows. Pipelines are defined as Python code β DAGs of tasks with explicit dependencies.
- DAG (Directed Acyclic Graph) β a Python file defining tasks and their dependencies. Does not contain the data itself; only the orchestration logic (when to run, what depends on what).
- Operators β the units of work.
BashOperator(run a shell command),PythonOperator(call a Python function),SparkSubmitOperator,SnowflakeOperator,DbtTaskGroup, etc. Sensors are special operators that wait for a condition (file arrives on S3, upstream table updated). - Scheduler β parses DAG files, triggers DAG runs based on schedule, and submits tasks to the executor.
- Executor β runs tasks.
LocalExecutor(single machine),CeleryExecutor(distributed across workers),KubernetesExecutor(each task in a separate pod β scalable, isolated).
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
from datetime import datetime, timedelta
with DAG(
dag_id='data_pipeline',
schedule='@daily',
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={'retries': 2, 'retry_delay': timedelta(minutes=5)},
) as dag:
load_raw = SQLExecuteQueryOperator(
task_id='load_raw_orders',
conn_id='snowflake_prod',
sql='CALL raw.load_orders_incremental()',
)
transform = DbtCloudRunJobOperator(
task_id='dbt_transform',
job_id=12345,
wait_for_termination=True,
)
load_raw >> transform # dependency: transform runs after load_raw
Anti-patterns:
- Heavy computation in DAG files β Airflow parses DAG files on every scheduler heartbeat. Database queries, API calls, or complex Python in the top-level DAG code slow the entire scheduler. Put computation inside operators/tasks.
- XCom for large data β XComs are stored in the Airflow metadata database (Postgres/MySQL). Passing DataFrames or large files via XCom corrupts the metadata DB and breaks the scheduler. Pass file paths or table names, not data.
- Giant tasks β tasks that process all data in one step are not restartable at fine granularity. Break into smaller, idempotent tasks.
- Dynamic DAGs generated in a loop without limits β generating thousands of tasks per DAG causes scheduler performance degradation.
- Using
catchup=Trueblindly β on a new DAG with a historicalstart_date, Airflow will try to backfill every past interval simultaneously, overwhelming your warehouse.
Best practices:
- Always set
catchup=Falseunless backfilling is intentional. - Use
max_active_runs=1for pipelines that shouldn't run concurrently. - Set task-level timeouts β a stuck task should fail, not block forever.
- Use the KubernetesExecutor for isolation and scalability in production.
- Store connections and variables in Airflow's encrypted connections store, not hardcoded in DAG files.
- Test DAGs with
airflow dags test dag_id execution_datelocally before deploying.
- Airflow β the incumbent. Scheduling-first: you define when tasks run and in what order. Large ecosystem, broad operator support. Steep learning curve. Operational overhead to run at scale. DAGs are decoupled from the code they orchestrate.
- Prefect β code-first: wrap Python functions with decorators (
@flow,@task). Designed for simplicity β minimal boilerplate, runs locally or in the cloud. Dynamic flows (tasks created at runtime based on data). State management and retries are first-class. Prefect Cloud is the managed offering. - Dagster β asset-first: the primary abstraction is the software-defined asset β a piece of data (a table, a file, an ML model) with its computation defined. Dagster builds the dependency graph from assets, not tasks. Deep integration with dbt, Spark, Snowflake. Excellent observability (asset materialisation history, freshness checks). Steeper initial learning curve but much better for data-aware orchestration.
Recommendation by context:
- Existing Airflow investment, many operators needed: stay with Airflow (or migrate to MWAA/Astronomer managed).
- Python-first team, want simplicity and dynamic workflows: Prefect.
- Building a modern data platform, asset-centric thinking, dbt + Spark: Dagster.
In traditional orchestration (Airflow), you think about tasks and their order. In Dagster, the primary unit is the asset β a data object (a database table, a Parquet file, a trained model) whose computation is defined alongside it. Dagster derives the dependency graph from which assets each computation reads and produces.
from dagster import asset, AssetIn
@asset
def raw_orders(database: SnowflakeResource):
"""Loads raw orders from the source database into the warehouse."""
database.execute("CALL load_orders_incremental()")
@asset(ins={"raw_orders": AssetIn()})
def stg_orders(raw_orders, dbt: DbtCliResource):
"""Runs the dbt staging model for orders."""
return dbt.cli(["run", "--select", "stg_orders"]).wait()
@asset(ins={"stg_orders": AssetIn()})
def fct_orders(stg_orders, dbt: DbtCliResource):
"""Runs the dbt fct_orders mart."""
return dbt.cli(["run", "--select", "fct_orders"]).wait()
Benefits over task-based thinking:
- The global asset graph is visible in the Dagster UI β you see the entire data lineage, not just individual DAG task graphs.
- Asset freshness policies β declare that
fct_ordersshould be materialised within 24 hours of its upstream assets changing. Dagster alerts when assets are stale. - Selective materialisation β rematerialise only
fct_ordersand its upstream dependencies, not the entire pipeline. - Testing β assets are plain Python functions; they can be unit-tested by mocking their inputs.
Production pipeline reliability requires systematic failure handling, not just retries.
Retry strategy: Configure task-level retries with exponential backoff. Differentiate transient errors (network timeout β retry) from logic errors (bad data β don't retry, alert). Airflow: retries=3, retry_delay=timedelta(minutes=5), retry_exponential_backoff=True.
Alerting: Send task failure notifications to Slack/PagerDuty via Airflow callbacks:
def notify_failure(context):
# context contains dag_run, task_instance, exception
send_slack_alert(
f"Task {context['task_instance'].task_id} failed "
f"in DAG {context['dag'].dag_id}. "
f"Error: {context['exception']}"
)
default_args = {
'on_failure_callback': notify_failure,
'on_retry_callback': notify_retry,
'sla': timedelta(hours=2), # alert if task exceeds 2 hours
}
SLA monitoring:
- Define expected completion times (Airflow SLA misses, Dagster freshness policies).
- Track pipeline duration trends β a gradual slowdown (5 min/day) will eventually breach SLA and is easier to fix early.
- Dead letter queues β for streaming pipelines, route poison-pill messages (those that repeatedly fail processing) to a DLQ for manual inspection instead of blocking the entire stream.
- Circuit breakers β if a pipeline has failed N times in a row, stop retrying and escalate to on-call instead of hammering a failing downstream system.
- Push-based (schedule-driven) β pipelines run on a cron schedule, regardless of whether upstream data has arrived. Simple, predictable. Problem: if upstream data is late, downstream jobs run on stale or incomplete data. The pipeline "fires and forgets."
- Pull-based (event/data-driven) β pipelines trigger when upstream data is ready, detected by sensors (S3 file arrival, Kafka message, table row count change). More responsive, avoids running on stale data. Complexity: need to handle missing events, stuck sensors, and cascading delays.
In practice, most production pipelines use a hybrid:
- A maximum schedule (run at most once per hour) combined with a data sensor (run only if new data has arrived).
- Event-driven triggers from upstream pipeline completion (Airflow
ExternalTaskSensor, Dagster asset sensors, cross-DAG triggering).
# Airflow: data-driven trigger β wait for S3 file before processing
from airflow.sensors.s3_key_sensor import S3KeySensor
wait_for_file = S3KeySensor(
task_id='wait_for_orders_file',
bucket_name='my-data-bucket',
bucket_key='orders/{{ ds }}/orders.parquet',
poke_interval=60, # check every 60 seconds
timeout=3600, # fail after 1 hour
mode='reschedule', # release the worker slot while waiting
)
wait_for_file >> process_orders
Data Quality
6 questionsData quality can be measured across six dimensions:
- Completeness β are all required values present? (NULL rates, missing records vs expected volume)
- Accuracy β does the data correctly represent the real-world entity? (hard to validate automatically; requires domain knowledge or comparison to a ground truth)
- Consistency β is the same data represented the same way across systems? (customer exists in CRM but not in billing DB)
- Timeliness / Freshness β is the data up to date? (SLA breach if yesterday's batch hasn't arrived by 8am)
- Uniqueness β are there duplicates where there shouldn't be? (duplicate order IDs, duplicate user registrations)
- Validity β do values conform to expected formats and ranges? (negative ages, future birthdays, invalid email formats)
Framework layers:
- Schema validation β enforce expected types and nullable constraints at ingestion (Great Expectations, dbt schema tests, Pydantic).
- Statistical checks β row counts, null rates, value distributions monitored over time. Alert on anomalies (row count drops 30% vs yesterday).
- Business rule validation β custom SQL assertions that encode domain logic (revenue can't be negative, orders must have at least one item).
- Referential integrity β foreign key relationships hold across tables.
- End-to-end reconciliation β periodic comparison of totals in the warehouse against the source system. The highest-confidence check.
Great Expectations (GX) is a Python-based data validation library. You define "expectations" about your data (e.g., expect_column_values_to_be_between), run them against a Pandas DataFrame, Spark DataFrame, or database table, and get an HTML data docs report showing pass/fail.
dbt tests are SQL queries run against warehouse tables; they return failing rows. Built into the dbt workflow, run after every transformation.
Comparison:
- Location of testing: dbt tests run in the warehouse on the output of models. GX can run anywhere in the pipeline β on raw files before ingestion, on a Spark DataFrame mid-pipeline, or on warehouse tables.
- Expressiveness: GX has 50+ built-in expectations (statistical distributions, regex patterns, set membership, quantile ranges). dbt built-ins are basic (not_null, unique, accepted_values). dbt-expectations package bridges this gap.
- Profiling: GX can profile a dataset and suggest expectations automatically. dbt has no profiling.
- Integration: dbt tests are simpler β if you're already using dbt, start there. GX is the right tool when you need pre-ingestion validation or pipeline-level checks outside dbt.
Many teams use both: dbt for model output validation, GX for raw source validation at ingestion.
Traditional data quality testing is rule-based and static β you define assertions and check if they pass. It catches the problems you anticipated. It doesn't catch unknown unknowns.
Data observability monitors the health of data continuously using ML-driven anomaly detection on data metrics β row counts, null rates, schema changes, freshness, and distribution shifts β without manually defining every rule. It detects unexpected changes that you didn't think to write a rule for.
Key capabilities of observability platforms (Monte Carlo, Acceldata, Metaplane, Bigeye):
- Automatic anomaly detection on table-level metrics (learns baseline, alerts on deviations).
- Schema change tracking β alerts when a column is added, removed, or its type changes.
- Data lineage integration β when an incident is detected, shows which upstream pipeline or source table caused it.
- Distribution monitoring β catches when the distribution of values in a column shifts (a feature drift signal for ML models, or a data pipeline bug).
- End-to-end data health scores per table and pipeline.
Analogy: dbt tests are like unit tests β you write assertions for known behaviours. Data observability is like monitoring + anomaly detection in production β it catches the things you didn't anticipate.
Duplicates enter pipelines through at-least-once message delivery, network retries, double-runs, or bugs in source systems. Handling them correctly is critical for accurate analytics.
Strategies:
- Exact deduplication β deduplicate on a natural key (order_id, event_id). Use
ROW_NUMBER()orQUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY updated_at DESC) = 1to keep the latest version. - Hash-based deduplication β compute a hash of all columns (MD5 or SHA). Deduplicate on the hash. Useful when there's no natural key but exact duplicates (same payload) should be removed.
- Delta Lake MERGE β upsert on primary key. Idempotent by design; reprocessing the same event just updates to the same value.
- Window-based deduplication β for streaming, deduplicate within a time window (last 24 hours of IDs held in state). Flink and Spark Structured Streaming both support stateful deduplication.
-- SQL deduplication: keep latest row per order_id
WITH deduped AS (
SELECT *,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY updated_at DESC
) AS rn
FROM raw_orders
)
INSERT INTO clean_orders
SELECT * EXCLUDE (rn) FROM deduped WHERE rn = 1;
A data contract is a formal, machine-readable agreement between a data producer (an application team, a data pipeline) and data consumers (analytics, other pipelines, ML models) that specifies the schema, semantics, freshness SLA, and quality expectations of a dataset.
Why it matters: Without contracts, a backend team renames a field in their database, which silently breaks 10 downstream pipelines and dashboards. With a contract, the change must be reviewed and versioned; consumers are notified before the change is deployed.
What a data contract specifies:
- Schema (field names, types, nullable)
- Semantics (what does
revenuemean β gross or net? including tax?) - SLA (freshness guarantee: data updated within 2 hours of transaction)
- Quality expectations (order_id is always unique, amount is always positive)
- Owner and contact information
# data-contracts/orders.yaml (ODCS spec)
id: orders_v2
info:
title: Orders
version: 2.0.0
owner: platform-team@company.com
schema:
- name: order_id
type: integer
primaryKey: true
description: Unique identifier for each order
- name: amount_cents
type: integer
nullable: false
description: Order total in cents (gross, before tax)
quality:
- type: sql
query: "SELECT COUNT(*) FROM orders WHERE amount_cents < 0"
mustBe: 0
Tools: Schemata (open-source), PayPal's Data Contract CLI, dbt contracts (enforcement in dbt 1.5+). The pattern is increasingly adopted as data mesh principles spread.
Reconciliation validates that the data in your warehouse accurately reflects the source of truth. It's the highest-confidence data quality check.
Level 1 β Count reconciliation: Source row count == warehouse row count for a given date. Fast, catches bulk load failures and truncation.
Level 2 β Aggregate reconciliation: Source SUM(revenue) == warehouse SUM(revenue) for a given date. Catches individual row value corruption or missed updates.
Level 3 β Row-level diff: Find rows that exist in source but not in warehouse, or with different values. Most thorough but expensive at scale.
-- Row-level reconciliation: find records in source missing from warehouse
SELECT s.order_id, s.total, 'missing_in_warehouse' AS issue
FROM source_orders s
LEFT JOIN warehouse_orders w ON s.order_id = w.order_id
WHERE w.order_id IS NULL
AND s.order_date = CURRENT_DATE - 1
UNION ALL
-- Find records with value discrepancies
SELECT s.order_id, s.total, 'value_mismatch' AS issue
FROM source_orders s
JOIN warehouse_orders w ON s.order_id = w.order_id
WHERE s.total != w.total
AND s.order_date = CURRENT_DATE - 1;
Automation: Run reconciliation jobs daily; alert if discrepancy rate exceeds a threshold (e.g., >0.01% of rows or >0.001% of revenue). Store reconciliation results in a quality table for trending analysis.
AI / ML Integration
6 questionsA vector database stores high-dimensional embedding vectors and enables efficient approximate nearest neighbour (ANN) search β finding the K most semantically similar vectors to a query vector.
Why it matters for LLMs: Text, images, and code can be converted into dense numerical embeddings by encoder models (OpenAI text-embedding-3, Sentence Transformers). Semantically similar content produces similar vectors. ANN search over embeddings powers:
- RAG (Retrieval-Augmented Generation) β at query time, retrieve the N most relevant document chunks from the vector DB and include them in the LLM context. Prevents hallucination and extends the LLM's knowledge to private data.
- Semantic search (find documents by meaning, not keywords)
- Recommendation systems (find items similar to what a user interacted with)
- Duplicate detection at scale
Popular vector databases: Pinecone (managed, easiest), Weaviate (open-source, multi-modal), Qdrant (open-source, Rust, high performance), pgvector (PostgreSQL extension β good for moderate scale without additional infrastructure), Milvus (CNCF, cloud-native scale).
# Building a RAG pipeline β embed documents and store in a vector DB
from sentence_transformers import SentenceTransformer
import qdrant_client
model = SentenceTransformer('all-MiniLM-L6-v2')
client = qdrant_client.QdrantClient("localhost", port=6333)
# Embed and insert documents
texts = ["Order 123 was placed on Jan 5", "Revenue for Q1 was $2.4M", ...]
embeddings = model.encode(texts)
client.upload_points("documents", points=[
PointStruct(id=i, vector=emb.tolist(), payload={"text": t})
for i, (emb, t) in enumerate(zip(embeddings, texts))
])
# At query time: find relevant context for the LLM
query_vec = model.encode(["What was Q1 revenue?"])
results = client.search("documents", query_vec[0], limit=5)
context = "\n".join([r.payload["text"] for r in results])
Building a production-grade embedding pipeline for a large corpus (millions of documents) requires handling chunking, batching, incremental updates, and quality.
Chunking strategy: Embedding models have context windows (512β8192 tokens). Documents must be split into chunks. Strategies:
- Fixed-size with overlap β split every 512 tokens with 50 token overlap. Simple; loses semantic coherence at chunk boundaries.
- Recursive character splitting β split on paragraphs, then sentences, then words. Preserves natural language boundaries. Default in LangChain.
- Semantic chunking β embed sentences, then group sentences with similar embeddings into coherent chunks. Better retrieval quality; more expensive.
Batch embedding pipeline:
# Efficient batch embedding with Spark for large corpora
from pyspark.sql.functions import pandas_udf
from sentence_transformers import SentenceTransformer
import pandas as pd
@pandas_udf("array")
def embed_batch(texts: pd.Series) -> pd.Series:
model = SentenceTransformer('all-MiniLM-L6-v2')
embeddings = model.encode(texts.tolist(), batch_size=256, show_progress_bar=False)
return pd.Series(embeddings.tolist())
# Process millions of chunks in parallel
embedded_df = chunks_df.withColumn("embedding", embed_batch("chunk_text"))
Incremental updates: Store the chunk hash alongside the embedding. On re-ingestion, only re-embed chunks whose source text has changed. For deletion, use soft deletes in the vector DB (mark chunks from a deleted document as inactive). Weaviate and Qdrant support payload filtering that enables "soft delete" without re-indexing.
MLflow is an open-source platform for managing the ML lifecycle. Its four components:
- MLflow Tracking β logs parameters, metrics, artifacts (model weights, plots) from training runs. Compare runs in a UI. The experiment log that makes ML research reproducible.
- MLflow Projects β package ML code in a reproducible format (conda env or Docker). Anyone can re-run an experiment from a git commit.
- MLflow Models β a standard format for saving models in a flavour-agnostic way (sklearn, PyTorch, TensorFlow, LightGBM). Load them with a single API regardless of framework.
- MLflow Model Registry β versioned model store with stage transitions (Staging β Production β Archived). Enables controlled model promotion with review workflow.
import mlflow
import mlflow.sklearn
with mlflow.start_run():
mlflow.log_param("n_estimators", 100)
mlflow.log_param("max_depth", 5)
model.fit(X_train, y_train)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.sklearn.log_model(model, "revenue_forecast_model")
# Register in model registry for production promotion
mlflow.register_model(
"runs:/{run_id}/revenue_forecast_model",
"revenue_forecast"
)
Data engineers integrate MLflow by: storing feature pipelines in MLflow Projects, logging data version (dbt model version, Delta table version) as a run parameter for reproducibility, and connecting the model registry to deployment pipelines (Kubernetes serving, Spark batch scoring).
- Data drift (covariate shift) β the statistical distribution of input features changes over time compared to the training distribution. Example: a fraud detection model trained on 2022 transactions; in 2024, transaction amounts are 30% higher on average due to inflation. The model's input distribution has drifted.
- Concept drift (label drift) β the relationship between features and the target variable changes. Same inputs now produce different outputs. Example: customer click patterns for "interest in buying" change due to a UI redesign.
Detection:
- Monitor feature distributions in production (mean, standard deviation, percentiles, null rates) against a reference window from training data.
- Statistical tests: Population Stability Index (PSI), Jensen-Shannon divergence, Kolmogorov-Smirnov test. Alert when PSI > 0.2 (significant drift).
- Model performance metrics (accuracy, AUC, RMSE) if ground truth labels are available with acceptable latency.
# Evidently AI β drift detection report
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset
report = Report(metrics=[DataDriftPreset()])
report.run(reference_data=training_df, current_data=production_df)
report.save_html("drift_report.html")
# Check summary
result = report.as_dict()
if result['metrics'][0]['result']['dataset_drift']:
trigger_model_retraining_pipeline()
Response: Drift below threshold β monitor. Drift above threshold β investigate data pipeline for bugs first (are features computed correctly?), then retrain the model on recent data, run A/B test, and promote if performance improves.
Real-time ML inference (fraud detection, personalisation, dynamic pricing) requires features to be available in milliseconds at inference time, computed from the latest available data.
Architecture β Lambda or Kappa pattern:
- Stream processor (Flink/Spark Streaming) β consumes events from Kafka, computes real-time features (rolling 1-hour transaction count, user session features), writes to the online feature store (Redis, DynamoDB).
- Batch processor (Spark/dbt) β computes features over larger windows (30-day purchase history, segment-level aggregates) that don't change frequently, loads them into the online store periodically.
- Feature serving layer β at inference time, the model server looks up features from Redis by entity key (user_id, merchant_id) in <5ms.
// Flink real-time feature computation
DataStream<UserFeatures> features = transactions
.keyBy(t -> t.userId)
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(5)))
.aggregate(new TransactionCountAggregator())
.map(agg -> new UserFeatures(agg.userId, agg.count, agg.totalAmount));
// Write to Redis (online store)
features.addSink(new RedisSink<>(redisConf, new UserFeaturesMapper()));
Key challenges:
- Point-in-time correctness β training features must match inference features exactly. Use the feature store's offline historical values for training, not a re-computation.
- Backfilling the online store β when a new feature is introduced, populate historical values before switching the model to use it.
- Feature expiry β stale features from an idle user should expire gracefully (Redis TTL) rather than returning stale values.
- Real-time inference β model served as an API endpoint; predictions generated on demand per request. Low latency (<100ms). Requires model server (BentoML, Seldon, TorchServe, SageMaker endpoint). Expensive at high throughput.
- Batch scoring β model run over a large dataset periodically (nightly, hourly). Predictions stored in a database for downstream consumption. High throughput, no latency requirement. Much cheaper per prediction. Suitable when predictions don't need to be instant (daily churn risk, weekly recommendation refresh).
Batch scoring in Spark β Pandas UDF (Vectorised) pattern:
import mlflow.sklearn
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
# Load model once per executor β avoid loading inside the UDF per row
model_path = "models:/churn_model/Production"
# Broadcast the model to all executors
broadcast_model = spark.sparkContext.broadcast(
mlflow.sklearn.load_model(model_path)
)
@pandas_udf("double")
def predict_churn(features: pd.DataFrame) -> pd.Series:
model = broadcast_model.value
return pd.Series(model.predict_proba(features)[:, 1])
predictions = customers_df \
.withColumn("churn_probability",
predict_churn(struct([col(f) for f in feature_cols]))) \
.select("customer_id", "churn_probability", current_timestamp().alias("scored_at"))
# Write predictions to warehouse
predictions.write.mode("overwrite") \
.saveAsTable("ml.churn_predictions_daily")
Broadcasting the model avoids re-loading it for every task. The Pandas UDF processes entire Parquet row groups vectorially β orders of magnitude faster than row-level Python UDFs.