data-pipelines
Use this skill when building data pipelines, ETL/ELT workflows, or data transformation layers. Triggers on Airflow DAG design, dbt model creation, Spark job optimization, streaming vs batch architecture decisions, data ingestion, data quality checks, pipeline orchestration, incremental loads, CDC (change data capture), schema evolution, and data warehouse modeling. Acts as a senior data engineer advisor for building reliable, scalable data infrastructure.
data data-engineeringetlairflowdbtsparkstreamingWhat is data-pipelines?
Use this skill when building data pipelines, ETL/ELT workflows, or data transformation layers. Triggers on Airflow DAG design, dbt model creation, Spark job optimization, streaming vs batch architecture decisions, data ingestion, data quality checks, pipeline orchestration, incremental loads, CDC (change data capture), schema evolution, and data warehouse modeling. Acts as a senior data engineer advisor for building reliable, scalable data infrastructure.
data-pipelines
data-pipelines is a production-ready AI agent skill for claude-code, gemini-cli, openai-codex, and 1 more. Building data pipelines, ETL/ELT workflows, or data transformation layers.
Quick Facts
| Field | Value |
|---|---|
| Category | data |
| Version | 0.1.0 |
| Platforms | claude-code, gemini-cli, openai-codex, mcp |
| License | MIT |
How to Install
- Make sure you have Node.js installed on your machine.
- Run the following command in your terminal:
npx skills add AbsolutelySkilled/AbsolutelySkilled --skill data-pipelines- The data-pipelines skill is now available in your AI coding agent (Claude Code, Gemini CLI, OpenAI Codex, etc.).
Overview
A senior data engineer's decision-making framework for building production data pipelines. This skill covers the five pillars of data engineering - ingestion patterns (ETL vs ELT), orchestration (Airflow), transformation (dbt), large-scale processing (Spark), and architecture choices (streaming vs batch) - with emphasis on when to use each pattern and the trade-offs involved. Designed for engineers who need opinionated guidance on building reliable, observable, and maintainable data infrastructure.
Tags
data-engineering etl airflow dbt spark streaming
Platforms
- claude-code
- gemini-cli
- openai-codex
- mcp
Related Skills
Pair data-pipelines with these complementary skills:
Frequently Asked Questions
What is data-pipelines?
Use this skill when building data pipelines, ETL/ELT workflows, or data transformation layers. Triggers on Airflow DAG design, dbt model creation, Spark job optimization, streaming vs batch architecture decisions, data ingestion, data quality checks, pipeline orchestration, incremental loads, CDC (change data capture), schema evolution, and data warehouse modeling. Acts as a senior data engineer advisor for building reliable, scalable data infrastructure.
How do I install data-pipelines?
Run npx skills add AbsolutelySkilled/AbsolutelySkilled --skill data-pipelines in your terminal. The skill will be immediately available in your AI coding agent.
What AI agents support data-pipelines?
This skill works with claude-code, gemini-cli, openai-codex, mcp. Install it once and use it across any supported AI coding agent.
Maintainers
Generated from AbsolutelySkilled
SKILL.md
Data Pipelines
A senior data engineer's decision-making framework for building production data pipelines. This skill covers the five pillars of data engineering - ingestion patterns (ETL vs ELT), orchestration (Airflow), transformation (dbt), large-scale processing (Spark), and architecture choices (streaming vs batch) - with emphasis on when to use each pattern and the trade-offs involved. Designed for engineers who need opinionated guidance on building reliable, observable, and maintainable data infrastructure.
When to use this skill
Trigger this skill when the user:
- Designs an ETL or ELT pipeline from scratch
- Writes or debugs an Airflow DAG
- Creates dbt models, tests, or macros
- Optimizes a Spark job (shuffles, partitioning, memory tuning)
- Decides between streaming and batch processing
- Implements incremental loads or change data capture (CDC)
- Plans a data warehouse or lakehouse architecture
- Needs data quality checks, schema evolution, or pipeline monitoring
Do NOT trigger this skill for:
- BI/analytics dashboard design or visualization (use an analytics skill)
- ML model training or feature engineering (use an ML/data-science skill)
Key principles
Idempotency is non-negotiable - Every pipeline run with the same input must produce the same output. Design for safe re-runs from day one. Use date partitions, merge keys, or upsert logic so that retries never corrupt data.
Prefer ELT over ETL in modern stacks - Load raw data first, transform in the warehouse. This preserves the source of truth, enables schema-on-read, and lets analysts iterate on transformations without re-ingesting. ETL still wins when you need to filter sensitive data before it lands.
Partition and increment, never full-reload - Full table scans on every run do not scale. Use incremental models (dbt), date-partitioned loads, and watermarks to process only what changed. Fall back to full reload only for small reference tables or disaster recovery.
Orchestrate, don't script - A cron job calling a Python script is not a pipeline. Use a proper orchestrator (Airflow, Dagster, Prefect) for retries, dependency management, backfills, and observability. The orchestrator should own scheduling and state, not your application code.
Test data like code - Schema tests, row count checks, uniqueness constraints, and freshness SLAs are not optional. dbt tests, Great Expectations, or custom assertions should gate every pipeline stage. Bad data downstream is more expensive than a failed pipeline.
Core concepts
Data pipelines move data from sources (databases, APIs, event streams) through transformations to destinations (warehouses, lakes, serving layers). The two dominant patterns are ETL (extract-transform-load) and ELT (extract-load-transform). ETL transforms data in-flight before loading; ELT loads raw data first and transforms inside the destination.
The pipeline lifecycle has four stages: ingestion (getting data in), orchestration (scheduling and dependency management), transformation (cleaning, joining, aggregating), and serving (making data available to consumers). Each stage has specialized tools: Fivetran/Airbyte for ingestion, Airflow/Dagster for orchestration, dbt for transformation, and the warehouse itself (BigQuery, Snowflake, Redshift) for serving.
Streaming vs batch is an architecture decision, not a tool choice. Batch processes data in time-windowed chunks (hourly, daily). Streaming processes events continuously as they arrive. Most organizations need both - batch for historical aggregations and streaming for real-time dashboards or alerting. The Lambda architecture runs both in parallel; the Kappa architecture uses a single streaming layer for everything.
Common tasks
Design an ETL/ELT pipeline
Decide the pattern based on your constraints:
Need to filter PII before landing? -> ETL (transform before load)
Want analysts to iterate on transforms? -> ELT (load raw, transform in warehouse)
Source data volume > 1TB per load? -> ELT with Spark for heavy transforms
Small reference data < 100MB? -> Direct load, skip the frameworkStandard ELT flow:
- Extract from source (API, database CDC, file drop)
- Load raw data to staging layer (preserve original schema)
- Transform in warehouse using dbt (staging -> intermediate -> mart)
- Test data quality at each layer boundary
- Serve from mart layer to downstream consumers
Always land raw data in an immutable staging layer. Transformations should read from staging, never modify it. This gives you a re-playable source of truth.
Write an Airflow DAG
A well-structured DAG separates orchestration from business logic:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=2),
}
with DAG(
dag_id="daily_orders_pipeline",
schedule="0 6 * * *",
start_date=datetime(2024, 1, 1),
catchup=False,
default_args=default_args,
tags=["production", "orders"],
) as dag:
extract = PythonOperator(
task_id="extract_orders",
python_callable=extract_orders_fn,
op_kwargs={"ds": "{{ ds }}"},
)
transform = BigQueryInsertJobOperator(
task_id="transform_orders",
configuration={"query": {"query": "{% include 'sql/transform_orders.sql' %}"}},
)
test = PythonOperator(
task_id="test_row_counts",
python_callable=assert_row_counts,
)
extract >> transform >> testUse
catchup=Falsefor most production DAGs unless you explicitly need backfill behavior. Setexecution_timeoutto prevent zombie tasks.
Build dbt models
Structure dbt projects in three layers:
models/
staging/ -- 1:1 with source tables, light renaming/casting
stg_orders.sql
stg_customers.sql
intermediate/ -- business logic joins, deduplication
int_orders_enriched.sql
marts/ -- final consumer-facing tables
fct_daily_revenue.sql
dim_customers.sqlExample incremental model:
-- models/staging/stg_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id',
on_schema_change='append_new_columns'
)
}}
select
order_id,
customer_id,
order_total,
cast(created_at as timestamp) as ordered_at
from {{ source('raw', 'orders') }}
{% if is_incremental() %}
where created_at > (select max(ordered_at) from {{ this }})
{% endif %}Always define
unique_keyfor incremental models. Without it, dbt appends instead of merging, causing duplicates on re-runs.
Optimize a Spark job
The three most common Spark performance killers and their fixes:
| Problem | Symptom | Fix |
|---|---|---|
| Data skew | One task takes 10x longer than others | Salt the join key, or use broadcast() for small tables |
| Too many shuffles | High shuffle read/write in Spark UI | Repartition before joins, coalesce after filters |
| Small files | Thousands of tiny output files | Use repartition(N) or coalesce(N) before write |
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.appName("optimize_example").getOrCreate()
# Broadcast small dimension table to avoid shuffle
orders = spark.read.parquet("s3://data/orders/")
products = spark.read.parquet("s3://data/products/") # < 100MB
enriched = orders.join(broadcast(products), "product_id", "left")
# Repartition by date before writing to avoid small files
enriched.repartition("order_date").write \
.partitionBy("order_date") \
.mode("overwrite") \
.parquet("s3://data/enriched_orders/")Check
spark.sql.shuffle.partitions(default 200). For small datasets, lower it. For large datasets with skew, raise it.
Choose streaming vs batch
Latency requirement < 1 minute? -> Streaming (Kafka + Flink/Spark Streaming)
Latency requirement 1 hour - 1 day? -> Batch (Airflow + dbt/Spark)
Need both real-time AND historical? -> Lambda (batch + streaming in parallel)
Want one codebase for both? -> Kappa (streaming-only, replay from log)Streaming is NOT always better. It adds complexity in exactly-once semantics, state management, late-arriving data, and debugging. Use batch unless you have a proven real-time requirement.
Common streaming stack: Kafka (ingestion) -> Flink or Spark Structured Streaming (processing) -> warehouse or serving store (output).
Implement data quality checks
Gate every pipeline stage with assertions:
# dbt schema.yml
models:
- name: fct_daily_revenue
columns:
- name: revenue_date
tests:
- not_null
- unique
- name: total_revenue
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
max_value: 10000000
tests:
- dbt_utils.recency:
datepart: day
field: revenue_date
interval: 2Set freshness SLAs on source tables. If source data is stale, fail the pipeline early rather than producing silently wrong results.
Anti-patterns / common mistakes
| Mistake | Why it's wrong | What to do instead |
|---|---|---|
| Full table reload every run | Doesn't scale, wastes compute, risks data loss during failures | Incremental loads with watermarks or CDC |
| Business logic in Airflow operators | Makes testing impossible, couples logic to orchestration | Keep Airflow thin - call dbt/Spark/scripts, don't embed SQL |
| No staging layer (transform in place) | Destroys source of truth, no replay capability | Land raw data in immutable staging, transform into separate layers |
| Ignoring data skew in Spark | One partition processes 90% of data, job takes hours | Salt keys, broadcast small tables, analyze data distribution first |
| Skipping schema tests | Bad data silently propagates, discovered by end users | dbt tests, Great Expectations, or custom assertions at every boundary |
| Over-engineering with streaming | Adds complexity without real-time need | Start with batch, add streaming only for proven sub-minute requirements |
| Hardcoded dates in queries | Breaks idempotency, prevents backfills | Use Airflow template variables ({{ ds }}) or dbt ref() / source() |
| No alerting on pipeline failures | Silent failures lead to stale dashboards | Alert on DAG failures, SLA misses, and data freshness breaches |
Gotchas
dbt incremental model without
unique_keycauses duplicates - An incremental model withoutunique_keyset in the config appends new records on every run instead of merging. A re-run after a failure produces duplicate rows that are extremely hard to detect and clean up downstream. Always defineunique_keyfor incremental models.Airflow
catchup=Truetriggering thousands of backfill runs - If you setcatchup=True(the default) on a DAG with astart_datemonths in the past, Airflow immediately schedules one run per interval from that start date until now. This can flood your workers. Setcatchup=Falsefor production DAGs and trigger backfills explicitly via the CLI.Hardcoded dates break idempotency - SQL queries with
WHERE created_at >= '2024-01-01'cannot be safely re-run for different time windows. Use Airflow template variables ({{ ds }}) or dbt source freshness definitions so that re-runs and backfills process the correct partition automatically.Data skew makes one Spark task run 10x longer - A join key where 80% of rows share one value (e.g.,
customer_id = NULLor a dominant category) causes one partition to process nearly the entire dataset while others finish immediately. Profile key cardinality withdf.groupBy("key").count().orderBy(desc("count")).show(20)before writing join logic.Streaming over-engineering for batch-compatible requirements - Kafka + Flink adds exactly-once semantics complexity, late-data handling, state backend management, and operational overhead. If the business requirement is "data available within 15 minutes," a scheduled Airflow DAG running every 10 minutes satisfies it with a fraction of the complexity. Start with batch; add streaming only for proven sub-minute latency needs.
References
For detailed patterns and implementation guidance on specific domains, read the
relevant file from the references/ folder:
references/airflow-patterns.md- DAG design patterns, sensors, dynamic DAGs, backfill strategiesreferences/dbt-patterns.md- model layering, macros, packages, CI/CD for dbtreferences/spark-tuning.md- memory config, shuffle optimization, partitioning, cachingreferences/streaming-architecture.md- Kafka, Flink, exactly-once, late data, windowing
Only load a references file if the current task requires it - they are long and will consume context.
References
airflow-patterns.md
Airflow Patterns
DAG design principles
Keep DAGs thin
The DAG file should define orchestration only - scheduling, dependencies, and retries. Business logic belongs in external modules, dbt projects, or Spark jobs that the DAG calls. This makes testing possible and prevents import-time side effects.
# Good: DAG calls an external function
extract = PythonOperator(
task_id="extract",
python_callable=extract_module.run, # logic lives elsewhere
op_kwargs={"date": "{{ ds }}"},
)
# Bad: Business logic inline in the DAG file
extract = PythonOperator(
task_id="extract",
python_callable=lambda: pd.read_sql("SELECT * FROM orders", engine),
)Task granularity
Each task should be:
- Idempotent - safe to re-run without side effects
- Atomic - either fully succeeds or fully fails (no partial state)
- Observable - produces logs and metrics that indicate success or failure
Split tasks when they have different retry profiles or SLAs. Merge tasks when the overhead of XCom or intermediate storage exceeds the benefit of granularity.
Naming conventions
dag_id: <team>_<domain>_<frequency> e.g. data_orders_daily
task_id: <verb>_<noun> e.g. extract_orders, test_row_countsTemplate variables
Use Jinja templates for date-aware, idempotent pipelines:
| Variable | Example value | Use for |
|---|---|---|
{{ ds }} |
2024-01-15 |
Partition keys, WHERE clauses |
{{ ds_nodash }} |
20240115 |
File paths, table suffixes |
{{ data_interval_start }} |
2024-01-15T00:00:00+00:00 |
Precise time ranges |
{{ data_interval_end }} |
2024-01-16T00:00:00+00:00 |
Exclusive upper bound |
{{ prev_ds }} |
2024-01-14 |
Referencing previous partition |
Never use
datetime.now()in a DAG. It breaks idempotency and makes backfills produce wrong results. Always use template variables.
Sensor patterns
Sensors wait for external conditions before proceeding. Use them sparingly - they consume a worker slot while waiting.
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
# Wait for upstream DAG to complete
wait_for_ingestion = ExternalTaskSensor(
task_id="wait_for_ingestion",
external_dag_id="data_ingestion_hourly",
external_task_id="load_complete",
mode="reschedule", # releases worker slot between checks
timeout=3600,
poke_interval=120,
)
# Wait for file to appear in S3
wait_for_file = S3KeySensor(
task_id="wait_for_file",
bucket_key="s3://data-lake/orders/{{ ds_nodash }}/*.parquet",
mode="reschedule",
timeout=7200,
)Always use
mode="reschedule"for sensors in production. The defaultmode="poke"holds a worker slot for the entire wait duration.
Dynamic DAGs
Generate tasks dynamically when the number of partitions or tables varies:
tables = ["orders", "customers", "products", "inventory"]
with DAG(dag_id="ingest_all_tables", ...) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
for table in tables:
extract = PythonOperator(
task_id=f"extract_{table}",
python_callable=extract_table,
op_kwargs={"table": table, "ds": "{{ ds }}"},
)
load = PythonOperator(
task_id=f"load_{table}",
python_callable=load_to_warehouse,
op_kwargs={"table": table},
)
start >> extract >> load >> endFor truly dynamic workloads (number of tasks unknown at DAG parse time), use
Airflow's @task.expand() (dynamic task mapping) in Airflow 2.3+:
@task
def get_partitions(ds=None):
return ["2024-01-01", "2024-01-02", "2024-01-03"]
@task
def process_partition(partition):
# process one partition
pass
with DAG(...) as dag:
partitions = get_partitions()
process_partition.expand(partition=partitions)Backfill strategies
When to backfill
- Schema change in the source that affects historical data
- Bug fix in transformation logic
- New column or metric added to an existing model
How to backfill safely
- Set
catchup=Truetemporarily or useairflow dags backfill - Ensure all tasks are idempotent (re-running a date overwrites, not appends)
- Throttle with
max_active_runsto avoid overwhelming the warehouse - Monitor for data quality regressions in downstream tables
# Backfill a specific date range
airflow dags backfill \
--start-date 2024-01-01 \
--end-date 2024-01-31 \
--reset-dagruns \
daily_orders_pipelineSet
max_active_runs=3(or lower) during backfills. Running 365 days simultaneously will overwhelm your warehouse and Airflow scheduler.
Common Airflow pitfalls
| Pitfall | Fix |
|---|---|
| Top-level imports in DAG file slow scheduler | Use lazy imports inside callables |
| XCom for large data (>48KB default) | Use external storage (S3/GCS), pass URI via XCom |
No execution_timeout on tasks |
Set timeout to prevent zombie tasks |
Using depends_on_past=True carelessly |
Blocks entire pipeline if one run fails |
Not setting sla on critical tasks |
Add SLA miss callbacks for alerting |
dbt-patterns.md
dbt Patterns
Model layering
Structure every dbt project in three layers. This is not optional - it is the standard that prevents spaghetti SQL and enables maintainability.
Staging layer (models/staging/)
- One model per source table (1:1 mapping)
- Only light transformations: renaming, casting, timezone conversion
- Materialized as
view(cheap, always fresh) - Naming:
stg_<source>__<table>.sql(double underscore separates source from table)
-- models/staging/stripe/stg_stripe__payments.sql
with source as (
select * from {{ source('stripe', 'payments') }}
),
renamed as (
select
id as payment_id,
customer_id,
cast(amount as decimal(10,2)) / 100 as amount_dollars,
currency,
status,
cast(created as timestamp) as created_at
from source
)
select * from renamedIntermediate layer (models/intermediate/)
- Business logic: joins, deduplication, window functions, enrichment
- Materialized as
vieworephemeral(no warehouse cost unless needed) - Naming:
int_<entity>_<verb>.sql(e.g.int_orders_enriched.sql)
-- models/intermediate/int_orders_enriched.sql
with orders as (
select * from {{ ref('stg_raw__orders') }}
),
customers as (
select * from {{ ref('stg_raw__customers') }}
),
enriched as (
select
o.order_id,
o.ordered_at,
o.order_total,
c.customer_name,
c.customer_segment,
row_number() over (
partition by o.customer_id order by o.ordered_at
) as order_sequence_number
from orders o
left join customers c on o.customer_id = c.customer_id
)
select * from enrichedMarts layer (models/marts/)
- Consumer-facing tables (dashboards, APIs, ML features)
- Materialized as
tableorincremental - Split into fact tables (
fct_) and dimension tables (dim_) - Naming:
fct_<metric>.sqlordim_<entity>.sql
-- models/marts/fct_daily_revenue.sql
{{
config(
materialized='incremental',
unique_key='revenue_date',
on_schema_change='sync_all_columns'
)
}}
with enriched_orders as (
select * from {{ ref('int_orders_enriched') }}
{% if is_incremental() %}
where ordered_at > (select max(revenue_date) from {{ this }})
{% endif %}
)
select
date_trunc('day', ordered_at) as revenue_date,
customer_segment,
count(*) as order_count,
sum(order_total) as total_revenue,
avg(order_total) as avg_order_value
from enriched_orders
group by 1, 2Incremental strategies
| Strategy | When to use | Config |
|---|---|---|
append |
Insert-only tables (events, logs) | incremental_strategy='append' |
merge |
Tables with updates (orders, users) | incremental_strategy='merge', unique_key='id' |
delete+insert |
Partition-level replacement | incremental_strategy='delete+insert', unique_key='date' |
insert_overwrite |
Large partitioned tables (Spark/Hive) | incremental_strategy='insert_overwrite' |
Always set
on_schema_changefor incremental models. Options:ignore(default, dangerous),append_new_columns,sync_all_columns,fail.
Testing
Schema tests (in schema.yml)
models:
- name: fct_daily_revenue
description: Daily revenue aggregated by customer segment
columns:
- name: revenue_date
tests:
- not_null
- unique
- name: total_revenue
tests:
- not_null
- dbt_utils.accepted_range:
min_value: 0
- name: customer_segment
tests:
- not_null
- accepted_values:
values: ['enterprise', 'mid_market', 'smb', 'consumer']Custom data tests (in tests/)
-- tests/assert_revenue_not_negative.sql
select revenue_date, total_revenue
from {{ ref('fct_daily_revenue') }}
where total_revenue < 0If this query returns any rows, the test fails.
Source freshness
sources:
- name: raw
freshness:
warn_after: { count: 12, period: hour }
error_after: { count: 24, period: hour }
loaded_at_field: _loaded_at
tables:
- name: orders
- name: customersRun with dbt source freshness to check whether upstream data is stale.
Useful macros
Generate surrogate keys
-- uses dbt_utils package
select
{{ dbt_utils.generate_surrogate_key(['order_id', 'line_item_id']) }} as order_line_key,
*
from {{ ref('stg_raw__order_lines') }}Pivot columns
select
customer_id,
{{ dbt_utils.pivot(
'order_status',
dbt_utils.get_column_values(ref('stg_raw__orders'), 'order_status')
) }}
from {{ ref('stg_raw__orders') }}
group by 1Essential packages
| Package | What it provides |
|---|---|
dbt-utils |
Surrogate keys, pivots, date spine, accepted_range tests |
dbt-expectations |
Great Expectations-style tests in dbt |
dbt-audit-helper |
Compare model results between dev and prod |
dbt-codegen |
Auto-generate staging models and schema YAML |
Install in packages.yml:
packages:
- package: dbt-labs/dbt_utils
version: [">=1.0.0", "<2.0.0"]
- package: calogica/dbt_expectations
version: [">=0.10.0", "<0.11.0"]CI/CD for dbt
Slim CI (only test changed models)
# In CI pipeline after PR is opened
dbt run --select state:modified+ --state ./prod-manifest/
dbt test --select state:modified+ --state ./prod-manifest/The state:modified+ selector runs only models that changed and their downstream
dependents. The --state flag points to the production manifest.json for comparison.
Pre-merge checklist
dbt build --select state:modified+passes- Source freshness checks pass
- No new
<!-- VERIFY -->comments without justification - Documentation updated for new models (
descriptionin schema.yml) - No direct references to source tables outside staging layer
spark-tuning.md
Spark Tuning
Memory architecture
Spark executors divide memory into three regions:
Total Executor Memory
|
+-- Execution Memory (shuffle, joins, sorts, aggregations)
| Default: 50% of (heap - 300MB reserved)
|
+-- Storage Memory (cached DataFrames, broadcast variables)
| Default: 50% of (heap - 300MB reserved)
|
+-- Reserved (300MB, not configurable)Execution and storage share a unified region. If execution needs more memory and storage is not using its share, execution can borrow. The reverse is also true.
Key memory configs
| Config | Default | Guidance |
|---|---|---|
spark.executor.memory |
1g | Set to 4-8g for most workloads, up to 16g for large joins |
spark.executor.memoryOverhead |
max(384MB, 10% of executor memory) | Increase for PySpark or jobs with many UDFs |
spark.memory.fraction |
0.6 | Fraction of heap for execution + storage |
spark.memory.storageFraction |
0.5 | Initial storage share within the unified region |
spark.driver.memory |
1g | Increase if collecting large results to driver |
Out of Memory errors? First check if a single partition is too large (skew). Increasing executor memory is a band-aid if the root cause is data skew.
Shuffle optimization
Shuffles are the #1 performance bottleneck in Spark. A shuffle occurs whenever data must move between partitions (joins, groupBy, repartition, distinct).
Reducing shuffle volume
- Filter early - push WHERE clauses before joins to reduce data volume
- Select only needed columns - less data per row = less shuffle bytes
- Broadcast small tables - eliminates shuffle entirely for one side of join
from pyspark.sql.functions import broadcast
# Broadcast tables under ~100MB to avoid shuffle
result = large_df.join(broadcast(small_df), "join_key")Partition tuning
| Config | Default | Guidance |
|---|---|---|
spark.sql.shuffle.partitions |
200 | Set to 2-4x the number of cores for your cluster |
spark.sql.files.maxPartitionBytes |
128MB | Controls input partition size for file reads |
Rule of thumb: Target 100-200MB per partition after shuffle. Too many small partitions create scheduling overhead. Too few large partitions cause OOM and skew.
# Check partition sizes after a transformation
df.rdd.mapPartitions(lambda it: [sum(1 for _ in it)]).collect()
# Repartition to target size
target_partitions = total_data_size_mb // 150 # ~150MB per partition
df = df.repartition(target_partitions)Adaptive Query Execution (AQE)
Enable AQE (default in Spark 3.2+) to let Spark auto-tune at runtime:
spark.conf.set("spark.sql.adaptive.enabled", True)
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", True)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", True)AQE handles:
- Coalescing small partitions after shuffle
- Splitting skewed partitions automatically
- Switching join strategies at runtime based on actual data sizes
Handling data skew
Data skew occurs when one or a few partition keys contain disproportionately more data than others. Symptoms: one task takes 10-100x longer than peers.
Diagnosis
# Check key distribution
df.groupBy("join_key").count().orderBy(col("count").desc()).show(20)Salting technique
Add a random salt to the skewed key, join on the salted key, then aggregate:
from pyspark.sql.functions import lit, rand, floor, concat, col
SALT_BUCKETS = 10
# Salt the large table
large_salted = large_df.withColumn(
"salt", floor(rand() * SALT_BUCKETS).cast("int")
).withColumn(
"salted_key", concat(col("join_key"), lit("_"), col("salt"))
)
# Explode the small table to match all salt values
from pyspark.sql.functions import explode, array
small_exploded = small_df.withColumn(
"salt", explode(array([lit(i) for i in range(SALT_BUCKETS)]))
).withColumn(
"salted_key", concat(col("join_key"), lit("_"), col("salt"))
)
# Join on salted key (distributes skewed key across SALT_BUCKETS partitions)
result = large_salted.join(small_exploded, "salted_key")Partitioning output files
Avoid small files problem
# Bad: thousands of tiny files
df.write.partitionBy("date").parquet("output/")
# Good: control file count per partition
df.repartition("date").write.partitionBy("date").parquet("output/")
# Better: explicit file count
df.repartition(100, "date").write.partitionBy("date").parquet("output/")Coalesce vs repartition
| Method | Shuffles? | Use when |
|---|---|---|
coalesce(N) |
No (narrows partitions) | Reducing partitions after filter (fewer files) |
repartition(N) |
Yes (full shuffle) | Even distribution needed, or changing partition key |
repartition(N, "col") |
Yes | Writing partitioned output with controlled file count |
Use
coalesceafter filtering to reduce partition count without a shuffle. Userepartitionbefore writing to ensure even file sizes.
Caching strategy
Cache DataFrames that are reused across multiple actions:
# Cache when a DataFrame is used in 2+ downstream actions
enriched_df = large_join_result.cache()
enriched_df.count() # triggers caching
# Use after multiple operations
summary = enriched_df.groupBy("segment").agg(...)
detail = enriched_df.filter(col("amount") > 1000)
# Unpersist when done
enriched_df.unpersist()When NOT to cache:
- DataFrame is used only once
- Dataset is larger than available storage memory
- The computation is cheap (simple filters on Parquet with predicate pushdown)
Common Spark anti-patterns
| Anti-pattern | Problem | Fix |
|---|---|---|
collect() on large DataFrame |
OOM on driver | Use take(N), show(), or write to storage |
| UDFs in PySpark | Serialization overhead, no Catalyst optimization | Use built-in functions or pandas_udf |
count() to check emptiness |
Scans entire dataset | Use head(1) or isEmpty() (Spark 3.3+) |
| No predicate pushdown | Reads entire Parquet file | Filter on partition columns, use where before select |
| Caching everything | Wastes memory, evicts useful caches | Cache only reused DataFrames, unpersist when done |
Ignoring explain() |
Missing optimization opportunities | Check physical plan for scans, shuffles, and broadcast decisions |
streaming-architecture.md
Streaming Architecture
When to use streaming
Streaming adds significant complexity. Use it only when you have a proven requirement for low-latency data processing:
| Requirement | Architecture |
|---|---|
| Dashboard refreshed every few seconds | Streaming |
| Real-time fraud detection or alerting | Streaming |
| Event-driven microservice reactions | Streaming |
| Hourly/daily analytics and reporting | Batch |
| Historical trend analysis | Batch |
| ML model training on large datasets | Batch |
If unsure, start with batch. You can always add streaming later for the specific use cases that need it.
Core streaming concepts
Event time vs processing time
- Event time: when the event actually occurred (embedded in the event payload)
- Processing time: when the system processes the event
Always use event time for aggregations. Processing time is unreliable because network delays, consumer restarts, and backpressure cause events to arrive out of order.
Delivery guarantees
| Guarantee | Meaning | Use when |
|---|---|---|
| At-most-once | Events may be lost, never duplicated | Metrics where small loss is acceptable |
| At-least-once | No loss, but duplicates possible | Most use cases (with idempotent consumers) |
| Exactly-once | No loss, no duplicates | Financial transactions, billing |
Exactly-once is expensive. It requires coordination between source, processor, and sink (typically using transactions or idempotent writes). Prefer at-least-once with idempotent consumers for most workloads.
Windowing
Windows group streaming events into finite chunks for aggregation:
| Window type | Description | Use case |
|---|---|---|
| Tumbling | Fixed-size, non-overlapping | Hourly counts, daily totals |
| Sliding | Fixed-size, overlapping | Moving averages (5-min window, 1-min slide) |
| Session | Gap-based, variable size | User session activity (close window after 30min idle) |
Kafka fundamentals
Topics and partitions
A Kafka topic is split into partitions. Each partition is an ordered, immutable log. Partitions enable parallelism - consumers in a group each read from different partitions.
Partition key matters: Events with the same key always go to the same partition (preserving order for that key). Choose a key that distributes evenly and groups related events:
Good key: user_id (even distribution, related events together)
Bad key: country (skew - US partition gets 50% of traffic)
Bad key: null (round-robin, loses ordering guarantees)Consumer groups
Each consumer group gets a full copy of the data. Within a group, each partition is assigned to exactly one consumer. Scale consumers up to the number of partitions (more consumers than partitions means idle consumers).
Retention and compaction
| Policy | Behavior | Use when |
|---|---|---|
| Time-based retention | Delete events older than N days | Event streams, logs |
| Log compaction | Keep only latest value per key | Changelog streams, state snapshots |
Exactly-once processing
Kafka transactions (Kafka Streams / Flink)
The pattern: read from input topic, process, write to output topic - all in one atomic transaction.
1. Consumer reads batch of events
2. Process events, produce output events
3. Commit consumer offsets AND producer writes atomically
4. If any step fails, the entire transaction rolls backIn Flink:
// Flink Kafka sink with exactly-once
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers("kafka:9092")
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-job-1")
.setRecordSerializer(...)
.build();Idempotent consumers (alternative)
If the sink supports idempotent writes (upsert by key), you can achieve effectively-exactly-once with at-least-once delivery:
# Idempotent write to database - duplicate events just overwrite
def process_event(event):
db.execute("""
INSERT INTO orders (order_id, status, updated_at)
VALUES (%s, %s, %s)
ON CONFLICT (order_id)
DO UPDATE SET status = EXCLUDED.status, updated_at = EXCLUDED.updated_at
""", (event.order_id, event.status, event.timestamp))Handling late-arriving data
Events arrive late due to network delays, offline devices, or batch uploads.
Watermarks
A watermark is a threshold that says: "I believe all events with event time before this watermark have arrived." Events arriving after the watermark are considered late.
# Spark Structured Streaming with watermark
from pyspark.sql.functions import window
events = spark.readStream.format("kafka").load()
windowed_counts = events \
.withWatermark("event_time", "2 hours") \
.groupBy(
window("event_time", "1 hour"),
"event_type"
) \
.count()The watermark of "2 hours" means: events arriving more than 2 hours late are dropped. Events within the 2-hour grace period update the window result.
Late data strategies
| Strategy | Trade-off |
|---|---|
| Drop late events | Simple, but loses data |
| Allow late updates within grace period | Good balance, most common |
| Side-output late events for batch reprocessing | No data loss, adds complexity |
Spark Structured Streaming
Trigger modes
| Mode | Behavior | Use when |
|---|---|---|
processingTime="10 seconds" |
Micro-batch every 10s | Most use cases |
availableNow=True |
Process all available, then stop | Incremental batch in streaming framework |
continuous (experimental) |
True continuous, ~1ms latency | Ultra-low latency needs |
query = windowed_counts.writeStream \
.outputMode("update") \
.format("delta") \
.option("checkpointLocation", "s3://checkpoints/hourly_counts") \
.trigger(processingTime="30 seconds") \
.start()Output modes
| Mode | Behavior | Use with |
|---|---|---|
append |
Only new rows | Non-aggregate queries, watermarked aggregates |
update |
Changed rows only | Aggregates (most common) |
complete |
Full result table every trigger | Small result sets, unbounded aggregates |
Checkpointing
Every streaming query must have a checkpoint location. Checkpoints store:
- Current offsets (where in Kafka/files the query has read to)
- State for aggregations (window contents, running counts)
- Metadata for recovery
Never delete checkpoints of a running query. To reset, stop the query, delete the checkpoint, and restart. Changing the query logic may require a new checkpoint if the state schema changes.
Architecture patterns
Lambda architecture
Run batch and streaming in parallel. Batch produces the "correct" result (recomputed from raw data). Streaming produces the "fast" result (approximate, real-time). A serving layer merges both views.
Source -> Kafka -> Streaming layer -> Speed view
-> Batch layer -> Batch view
|
Serving layer (merge)Downside: Two codebases doing the same computation, eventual consistency between views, operational complexity.
Kappa architecture
Single streaming layer processes everything. Historical reprocessing is done by replaying the event log from the beginning.
Source -> Kafka (long retention) -> Stream processor -> Serving layerDownside: Replay can be slow for large histories. Not all computations are naturally expressible as streaming.
Practical recommendation
Most teams should use batch as the default with streaming for specific hot paths (fraud, alerting, live dashboards). This avoids the operational burden of full streaming while getting real-time where it matters.
Common streaming pitfalls
| Pitfall | Fix |
|---|---|
| No dead letter queue | Route failed events to a DLQ for manual review |
| Unbounded state in aggregations | Use watermarks to expire old state |
| Consumer lag growing unbounded | Scale consumers, optimize processing, or increase partitions |
| No backpressure handling | Configure max records per poll, enable consumer flow control |
| Schema changes break consumers | Use a schema registry (Confluent, AWS Glue) with compatibility checks |
| Testing only happy path | Test late events, out-of-order events, duplicate events, and poison pills |
Frequently Asked Questions
What is data-pipelines?
Use this skill when building data pipelines, ETL/ELT workflows, or data transformation layers. Triggers on Airflow DAG design, dbt model creation, Spark job optimization, streaming vs batch architecture decisions, data ingestion, data quality checks, pipeline orchestration, incremental loads, CDC (change data capture), schema evolution, and data warehouse modeling. Acts as a senior data engineer advisor for building reliable, scalable data infrastructure.
How do I install data-pipelines?
Run npx skills add AbsolutelySkilled/AbsolutelySkilled --skill data-pipelines in your terminal. The skill will be immediately available in your AI coding agent.
What AI agents support data-pipelines?
data-pipelines works with claude-code, gemini-cli, openai-codex, mcp. Install it once and use it across any supported AI coding agent.