data-quality
Use this skill when implementing data validation, data quality monitoring, data lineage tracking, data contracts, or Great Expectations test suites. Triggers on schema validation, data profiling, freshness checks, row-count anomalies, column drift, expectation suites, contract testing between producers and consumers, lineage graphs, data observability, and any task requiring data integrity enforcement across pipelines.
data data-qualityvalidationlineagegreat-expectationscontractsmonitoringWhat is data-quality?
Use this skill when implementing data validation, data quality monitoring, data lineage tracking, data contracts, or Great Expectations test suites. Triggers on schema validation, data profiling, freshness checks, row-count anomalies, column drift, expectation suites, contract testing between producers and consumers, lineage graphs, data observability, and any task requiring data integrity enforcement across pipelines.
data-quality
data-quality is a production-ready AI agent skill for claude-code, gemini-cli, openai-codex. Implementing data validation, data quality monitoring, data lineage tracking, data contracts, or Great Expectations test suites.
Quick Facts
| Field | Value |
|---|---|
| Category | data |
| Version | 0.1.0 |
| Platforms | claude-code, gemini-cli, openai-codex |
| License | MIT |
How to Install
- Make sure you have Node.js installed on your machine.
- Run the following command in your terminal:
npx skills add AbsolutelySkilled/AbsolutelySkilled --skill data-quality- The data-quality skill is now available in your AI coding agent (Claude Code, Gemini CLI, OpenAI Codex, etc.).
Overview
Data quality is the practice of ensuring that data is accurate, complete, consistent, timely, and trustworthy as it flows through pipelines and systems. Without explicit quality gates, bad data propagates silently - corrupting dashboards, training flawed models, and breaking downstream consumers. This skill covers the five pillars: schema validation at ingress, expectation-based testing with Great Expectations, data contracts between producers and consumers, lineage tracking for impact analysis, and continuous monitoring for anomaly detection.
Tags
data-quality validation lineage great-expectations contracts monitoring
Platforms
- claude-code
- gemini-cli
- openai-codex
Related Skills
Pair data-quality with these complementary skills:
Frequently Asked Questions
What is data-quality?
Use this skill when implementing data validation, data quality monitoring, data lineage tracking, data contracts, or Great Expectations test suites. Triggers on schema validation, data profiling, freshness checks, row-count anomalies, column drift, expectation suites, contract testing between producers and consumers, lineage graphs, data observability, and any task requiring data integrity enforcement across pipelines.
How do I install data-quality?
Run npx skills add AbsolutelySkilled/AbsolutelySkilled --skill data-quality in your terminal. The skill will be immediately available in your AI coding agent.
What AI agents support data-quality?
This skill works with claude-code, gemini-cli, openai-codex. Install it once and use it across any supported AI coding agent.
Maintainers
Generated from AbsolutelySkilled
SKILL.md
Data Quality
Data quality is the practice of ensuring that data is accurate, complete, consistent, timely, and trustworthy as it flows through pipelines and systems. Without explicit quality gates, bad data propagates silently - corrupting dashboards, training flawed models, and breaking downstream consumers. This skill covers the five pillars: schema validation at ingress, expectation-based testing with Great Expectations, data contracts between producers and consumers, lineage tracking for impact analysis, and continuous monitoring for anomaly detection.
When to use this skill
Trigger this skill when the user:
- Adds data validation or schema enforcement to a pipeline (ingestion, transformation, or serving)
- Writes Great Expectations expectation suites or checkpoints
- Defines data contracts between a producer team and consumer teams
- Implements data lineage tracking or impact analysis
- Sets up data quality monitoring dashboards or freshness/volume alerts
- Investigates data quality incidents (missing columns, null spikes, schema drift)
- Profiles a new dataset to understand distributions and anomalies
- Builds row-count, freshness, or distribution-based quality checks
Do NOT trigger this skill for:
- General ETL/ELT pipeline orchestration (use an Airflow/dbt skill instead)
- Data modeling or warehouse design decisions without a quality focus
Key principles
Validate at boundaries, not in the middle - Enforce quality at ingestion (before data enters your warehouse) and at serving (before consumers read it). Validating mid-pipeline catches problems too late to prevent downstream damage and too early to catch transformation bugs.
Contracts are APIs for data - A data contract is a formal agreement between a producer and consumer on schema, semantics, SLAs, and ownership. Treat it like a versioned API - breaking changes require migration paths, not surprise emails.
Test data like you test code - Every table should have expectations that run on every pipeline execution. Column nullability, uniqueness constraints, value ranges, referential integrity, and freshness are not optional - they are the unit tests of data engineering.
Lineage enables impact analysis - You cannot assess the blast radius of a schema change without knowing what reads from what. Instrument lineage at the query level (not just table level) so you can trace column-level dependencies.
Monitor trends, not just thresholds - A row count of 1M is fine today but means nothing without historical context. Use statistical anomaly detection (z-score, moving averages) to catch gradual drift that static thresholds miss.
Core concepts
The five dimensions of data quality
| Dimension | Question answered | How to measure |
|---|---|---|
| Accuracy | Does the data reflect reality? | Cross-reference with source of truth, spot-check samples |
| Completeness | Are all expected records and fields present? | Null rate per column, row count vs expected count |
| Consistency | Do related datasets agree? | Cross-table referential integrity checks, duplicate detection |
| Timeliness | Is the data fresh enough for its use case? | Freshness SLA: time since last successful load |
| Uniqueness | Are there unwanted duplicates? | Primary key uniqueness checks, deduplication audits |
Data contracts
A data contract defines: the schema (column names, types, constraints), semantic meaning (what "revenue" means - gross or net), SLAs (freshness, volume bounds), and ownership (who to page when it breaks). Contracts are versioned artifacts stored alongside code - not wiki pages that rot. The producer owns the contract and is responsible for not shipping breaking changes without a version bump.
Data lineage
Lineage is a directed acyclic graph (DAG) where nodes are datasets (tables, views, files) and edges are transformations (SQL queries, Spark jobs, dbt models). Column-level lineage tracks which output columns derive from which input columns. Tools like OpenLineage, DataHub, and dbt's built-in lineage provide this automatically when integrated into your orchestrator.
Great Expectations
Great Expectations (GX) is the standard open-source framework for data testing. The core
abstractions are: Data Source (connection to your data), Expectation Suite (a
collection of assertions about a dataset), Validator (runs expectations against data),
and Checkpoint (an orchestratable unit that validates data and triggers actions on
pass/fail). Expectations are declarative - expect_column_values_to_not_be_null - and
produce rich, human-readable validation results.
Common tasks
Write a Great Expectations suite
Define expectations for a table covering nullability, types, ranges, and uniqueness.
import great_expectations as gx
context = gx.get_context()
# Connect to data source
datasource = context.data_sources.add_postgres(
name="warehouse",
connection_string="postgresql+psycopg2://user:pass@host:5432/db",
)
data_asset = datasource.add_table_asset(name="orders", table_name="orders")
batch_definition = data_asset.add_batch_definition_whole_table("full_table")
# Create expectation suite
suite = context.suites.add(
gx.ExpectationSuite(name="orders_quality")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeUnique(column="order_id")
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeBetween(
column="total_amount", min_value=0, max_value=1_000_000
)
)
suite.add_expectation(
gx.expectations.ExpectColumnValuesToBeInSet(
column="status", value_set=["pending", "completed", "cancelled", "refunded"]
)
)
suite.add_expectation(
gx.expectations.ExpectTableRowCountToBeBetween(min_value=1000, max_value=10_000_000)
)Always start with not-null and uniqueness expectations on primary keys before adding business-logic expectations.
Run a checkpoint in a pipeline
Wire a Great Expectations checkpoint into your orchestrator so validation runs on every load.
import great_expectations as gx
context = gx.get_context()
# Define a checkpoint that validates the orders suite
checkpoint = context.checkpoints.add(
gx.Checkpoint(
name="orders_checkpoint",
validation_definitions=[
gx.ValidationDefinition(
name="orders_validation",
data=context.data_sources.get("warehouse")
.get_asset("orders")
.get_batch_definition("full_table"),
suite=context.suites.get("orders_quality"),
)
],
actions=[
gx.checkpoint_actions.UpdateDataDocsAction(name="update_docs"),
],
)
)
# Run in Airflow task / dbt post-hook / standalone script
result = checkpoint.run()
if not result.success:
failing = [r for r in result.run_results.values() if not r.success]
raise RuntimeError(f"Data quality check failed: {len(failing)} validations failed")Define a data contract
Create a YAML contract between a producer and consumer team.
# contracts/orders-v2.yaml
apiVersion: datacontract/v1.0
kind: DataContract
metadata:
name: orders
version: 2.0.0
owner: payments-team
consumers:
- analytics-team
- ml-team
schema:
type: table
database: warehouse
table: public.orders
columns:
- name: order_id
type: string
constraints: [not_null, unique]
description: UUID primary key
- name: customer_id
type: string
constraints: [not_null]
description: FK to customers.customer_id
- name: total_amount
type: decimal(10,2)
constraints: [not_null, gte_0]
description: Gross order total in USD
- name: status
type: string
constraints: [not_null]
allowed_values: [pending, completed, cancelled, refunded]
- name: created_at
type: timestamp
constraints: [not_null]
sla:
freshness: 1h # data must be no older than 1 hour
volume:
min_rows_per_day: 1000
max_rows_per_day: 500000
availability: 99.9%
breaking_changes:
policy: notify_consumers_7_days_before
channel: "#data-contracts-changes"Version bump the contract on any schema change. Additive changes (new nullable columns) are non-breaking. Removing or renaming columns, changing types, or tightening constraints are breaking.
Implement freshness and volume monitoring
Build SQL-based checks that run on a schedule and alert when data is stale or volume is anomalous.
-- Freshness check: alert if orders table has no data in the last 2 hours
SELECT
CASE
WHEN MAX(created_at) < NOW() - INTERVAL '2 hours'
THEN 'STALE'
ELSE 'FRESH'
END AS freshness_status,
MAX(created_at) AS last_record_at,
NOW() - MAX(created_at) AS staleness_duration
FROM orders;
-- Volume anomaly check: compare today's count to 7-day rolling average
WITH daily_counts AS (
SELECT
DATE(created_at) AS dt,
COUNT(*) AS row_count
FROM orders
WHERE created_at >= CURRENT_DATE - INTERVAL '8 days'
GROUP BY DATE(created_at)
),
stats AS (
SELECT
AVG(row_count) AS avg_count,
STDDEV(row_count) AS stddev_count
FROM daily_counts
WHERE dt < CURRENT_DATE
)
SELECT
dc.row_count AS today_count,
s.avg_count,
(dc.row_count - s.avg_count) / NULLIF(s.stddev_count, 0) AS z_score
FROM daily_counts dc, stats s
WHERE dc.dt = CURRENT_DATE;
-- Alert if z_score < -2 (significantly fewer rows than normal)Track data lineage with OpenLineage
Emit lineage events from your pipeline so downstream consumers can trace dependencies.
from openlineage.client import OpenLineageClient
from openlineage.client.run import RunEvent, RunState, Run, Job, InputDataset, OutputDataset
from openlineage.client.facet_v2 import (
schema_dataset_facet,
sql_job_facet,
)
import uuid
from datetime import datetime, timezone
client = OpenLineageClient(url="http://lineage-server:5000")
run_id = str(uuid.uuid4())
job = Job(namespace="warehouse", name="transform_orders")
# Emit START event
client.emit(RunEvent(
eventType=RunState.START,
eventTime=datetime.now(timezone.utc).isoformat(),
run=Run(runId=run_id),
job=job,
inputs=[
InputDataset(
namespace="warehouse",
name="raw.orders",
facets={
"schema": schema_dataset_facet.SchemaDatasetFacet(
fields=[
schema_dataset_facet.SchemaDatasetFacetFields(
name="order_id", type="STRING"
),
schema_dataset_facet.SchemaDatasetFacetFields(
name="amount", type="DECIMAL"
),
]
)
},
)
],
outputs=[
OutputDataset(namespace="warehouse", name="curated.orders")
],
))
# ... run transformation ...
# Emit COMPLETE event
client.emit(RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.now(timezone.utc).isoformat(),
run=Run(runId=run_id),
job=job,
inputs=[InputDataset(namespace="warehouse", name="raw.orders")],
outputs=[OutputDataset(namespace="warehouse", name="curated.orders")],
))OpenLineage integrates natively with Airflow, Spark, and dbt. Prefer built-in integration over manual event emission when available.
Profile a new dataset
Use Great Expectations profiling to understand a dataset before writing expectations.
import great_expectations as gx
context = gx.get_context()
datasource = context.data_sources.get("warehouse")
asset = datasource.get_asset("new_table")
batch = asset.get_batch_definition("full_table").get_batch()
# Run a profiler to auto-generate expectations based on data
profiler_result = context.assistants.onboarding.run(
batch_request=batch.batch_request,
)
# Review generated expectations before promoting to a suite
for expectation in profiler_result.expectation_suite.expectations:
print(f"{expectation.expectation_type}: {expectation.kwargs}")Profiling is a starting point, not an end state. Always review and tighten auto-generated expectations based on domain knowledge.
Anti-patterns / common mistakes
| Mistake | Why it's wrong | What to do instead |
|---|---|---|
| Validating only in the warehouse | Bad data already propagated to consumers before checks run | Validate at ingestion boundaries before data lands |
| Static thresholds for volume checks | Row counts change over time; fixed thresholds cause alert fatigue | Use z-score or rolling-average anomaly detection |
| No ownership on data contracts | Contracts without an owner rot and stop reflecting reality | Every contract must name a producing team and a Slack channel |
| Testing only column types, not semantics | Type checks pass but "revenue" contains negative values or wrong currency | Add business-logic expectations (ranges, allowed values, referential integrity) |
| Skipping lineage for "simple" pipelines | Simple pipelines grow complex; retrofitting lineage is 10x harder | Instrument lineage from day one via OpenLineage or dbt |
| Running Great Expectations only in CI | Production data differs from test data; CI-only checks miss production drift | Run checkpoints on every production pipeline execution |
Gotchas
Static volume thresholds cause alert fatigue - Setting a fixed threshold like "alert if row count < 900,000" breaks as soon as business seasonality kicks in (weekends, holidays, seasonal products). Static thresholds generate false positive alerts that teams learn to ignore. Use z-score anomaly detection against a rolling 7-14 day baseline instead.
Great Expectations profiler expectations promoted without review - The onboarding profiler auto-generates expectations based on observed data distributions. If the data you profile on already contains quality issues (outliers, null spikes), those bad patterns get baked into the expectation suite as acceptable. Always review and tighten profiler-generated expectations with domain knowledge before promoting to production checkpoints.
Data contracts without enforcement - A YAML data contract in a repository that no pipeline actually reads is documentation, not a contract. Contracts only provide value when a CI check or pipeline gate validates that the producer's output conforms to the contract schema and SLA before it lands in the consumer's dataset.
Lineage at table level misses column-level blast radius - Table-level lineage tells you "Table A feeds Table B," but if you rename a column in Table A, you need column-level lineage to know which specific downstream columns and models break. Instrument column-level lineage from the start via dbt's built-in lineage or OpenLineage column facets.
Running checkpoints only in CI, not production - CI validates a sample of test data. Production data has different volumes, distributions, and edge cases that CI fixtures never capture. A checkpoint that passes in CI and never runs in production provides a false sense of security. Run checkpoints on every production pipeline execution, not just on PRs.
References
For detailed content on specific sub-domains, read the relevant file
from the references/ folder:
references/great-expectations-advanced.md- Advanced GX patterns: custom expectations, data docs hosting, store backends, and multi-batch validationreferences/data-contracts-spec.md- Full data contract specification, versioning strategies, and enforcement patternsreferences/lineage-tools.md- Comparison of lineage tools (OpenLineage, DataHub, Atlan, dbt lineage) and integration guides
Only load a references file if the current task requires deep detail on that sub-domain. The skill above covers the most common validation, monitoring, and lineage tasks.
References
data-contracts-spec.md
Data Contracts Specification
What is a data contract?
A data contract is a formal, versioned agreement between a data producer and its consumers. It defines what data will look like (schema), what it means (semantics), how fresh it will be (SLAs), and who is responsible (ownership). Contracts shift data quality left - the producer guarantees quality at the source instead of consumers building defensive checks downstream.
Contract structure
Every data contract should include these sections:
Metadata
apiVersion: datacontract/v1.0
kind: DataContract
metadata:
name: orders # Unique identifier
version: 2.1.0 # Semantic versioning
owner: payments-team # Producing team
consumers: # Who reads this data
- analytics-team
- ml-team
- marketing-team
tags: [payments, orders, transactional]
description: >
All customer orders from the checkout flow. Updated in near-real-time
via CDC from the orders microservice PostgreSQL database.Schema definition
schema:
type: table
database: warehouse
table: curated.orders
columns:
- name: order_id
type: string
constraints: [not_null, unique]
description: UUID v4 primary key
pii: false
- name: customer_id
type: string
constraints: [not_null]
description: FK to customers.customer_id
pii: true
- name: total_amount
type: decimal(10,2)
constraints: [not_null]
description: Gross order total in USD (before discounts)
semantic: gross_revenue
unit: USD
- name: discount_amount
type: decimal(10,2)
constraints: [not_null]
description: Total discount applied
default: 0.00
- name: status
type: string
constraints: [not_null]
allowed_values: [pending, processing, completed, cancelled, refunded]
description: Current order lifecycle status
- name: created_at
type: timestamp_tz
constraints: [not_null]
description: Order creation time in UTC
- name: updated_at
type: timestamp_tz
constraints: [not_null]
description: Last modification time in UTCSemantic definitions
Avoid ambiguity by defining what business terms mean:
semantics:
gross_revenue: >
Total order amount before any discounts, taxes, or shipping costs.
Includes all line items at their list price.
net_revenue: >
gross_revenue minus discount_amount. Does NOT include tax or shipping.
active_order: >
An order with status in [pending, processing]. Completed, cancelled,
and refunded orders are not active.SLA definitions
sla:
freshness: 15m # Data must be no older than 15 minutes
volume:
min_rows_per_hour: 100 # Alert if fewer than 100 orders/hour
max_rows_per_hour: 50000 # Alert if spike exceeds 50k/hour
availability: 99.9% # Table must be queryable 99.9% of the time
quality:
null_rate_threshold: 0.01 # No column should exceed 1% null rate
duplicate_rate_threshold: 0 # Zero tolerance for duplicate order_idsContact and escalation
support:
slack_channel: "#payments-data"
oncall_rotation: payments-data-oncall
escalation_policy: >
1. Post in #payments-data with details
2. If no response in 30min, page payments-data-oncall
3. If P1 impact, page payments-eng-oncall directlyVersioning strategy
Follow semantic versioning for data contracts:
| Change type | Version bump | Examples |
|---|---|---|
| Patch (0.0.x) | Bug fix in description, tightening an SLA | Fix typo in column description |
| Minor (0.x.0) | Additive, non-breaking change | New nullable column, new allowed_value added |
| Major (x.0.0) | Breaking change | Column removed, type changed, column renamed, constraint tightened |
Breaking change protocol
- Producer announces breaking change in the contract's Slack channel
- Minimum 7-day notice period for consumer teams to adapt
- Producer publishes new contract version (major bump)
- Old version remains supported for a deprecation window (typically 30 days)
- Producer removes old version after deprecation window
Enforcement patterns
Schema enforcement at ingestion
import json
import jsonschema
def validate_against_contract(record: dict, contract_path: str) -> bool:
"""Validate a single record against its data contract schema."""
with open(contract_path) as f:
contract = yaml.safe_load(f)
# Convert contract columns to JSON Schema
properties = {}
required = []
for col in contract["schema"]["columns"]:
prop = {"type": _map_type(col["type"])}
if "allowed_values" in col:
prop["enum"] = col["allowed_values"]
properties[col["name"]] = prop
if "not_null" in col.get("constraints", []):
required.append(col["name"])
schema = {
"type": "object",
"properties": properties,
"required": required,
"additionalProperties": False,
}
jsonschema.validate(record, schema)
return TrueContract testing in CI
Run contract tests on every PR that modifies a producer's schema:
# .github/workflows/contract-test.yml
name: Data Contract Tests
on:
pull_request:
paths:
- 'contracts/**'
- 'migrations/**'
- 'models/**'
jobs:
contract-test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Validate contract schema
run: |
pip install datacontract-cli
datacontract test contracts/orders-v2.yaml
- name: Check for breaking changes
run: |
datacontract breaking contracts/orders-v2.yaml --against mainRuntime enforcement with Great Expectations
Generate GX expectations automatically from a data contract:
def contract_to_expectations(contract_path: str) -> list:
"""Convert a data contract YAML to Great Expectations expectations."""
import yaml
import great_expectations as gx
with open(contract_path) as f:
contract = yaml.safe_load(f)
expectations = []
for col in contract["schema"]["columns"]:
if "not_null" in col.get("constraints", []):
expectations.append(
gx.expectations.ExpectColumnValuesToNotBeNull(column=col["name"])
)
if "unique" in col.get("constraints", []):
expectations.append(
gx.expectations.ExpectColumnValuesToBeUnique(column=col["name"])
)
if "allowed_values" in col:
expectations.append(
gx.expectations.ExpectColumnValuesToBeInSet(
column=col["name"],
value_set=col["allowed_values"],
)
)
return expectations great-expectations-advanced.md
Great Expectations - Advanced Patterns
Custom expectations
When built-in expectations don't cover your business logic, write a custom expectation
by subclassing Expectation.
from great_expectations.expectations import Expectation
from great_expectations.core import ExpectationValidationResult
import re
class ExpectColumnValuesToMatchCorporateEmail(Expectation):
"""Expect column values to be valid corporate email addresses."""
expectation_type = "expect_column_values_to_match_corporate_email"
domain = "column"
domain_param = "column"
success_keys = ("column", "domain_suffix")
default_kwarg_values = {
"domain_suffix": "@company.com",
}
def _validate(self, metrics, runtime_configuration=None, execution_engine=None):
column_values = metrics.get("column_values.nonnull")
domain_suffix = self.kwargs.get("domain_suffix", "@company.com")
unexpected = [
v for v in column_values
if not str(v).endswith(domain_suffix)
]
return ExpectationValidationResult(
success=len(unexpected) == 0,
result={
"unexpected_count": len(unexpected),
"unexpected_values": unexpected[:20],
"element_count": len(column_values),
},
)Register custom expectations by placing them in a great_expectations/plugins/expectations/
directory in your project. GX auto-discovers them on context initialization.
Store backends
By default GX stores expectations and validation results on the local filesystem. For team use, configure a cloud store backend.
# great_expectations/great_expectations.yml
stores:
expectations_store:
class_name: ExpectationsStore
store_backend:
class_name: TupleS3StoreBackend
bucket: my-gx-store
prefix: expectations/
validation_results_store:
class_name: ValidationResultsStore
store_backend:
class_name: TupleS3StoreBackend
bucket: my-gx-store
prefix: validations/
checkpoint_store:
class_name: CheckpointStore
store_backend:
class_name: TupleS3StoreBackend
bucket: my-gx-store
prefix: checkpoints/Supported backends: local filesystem, S3, GCS, Azure Blob, PostgreSQL.
Data Docs hosting
Data Docs are static HTML reports generated from validation results. Host them for team visibility.
# great_expectations/great_expectations.yml
data_docs_sites:
s3_site:
class_name: SiteBuilder
store_backend:
class_name: TupleS3StoreBackend
bucket: my-data-docs
prefix: docs/
site_index_builder:
class_name: DefaultSiteIndexBuilderFor internal hosting, use S3 static website hosting with CloudFront, or serve from a simple Nginx container.
Multi-batch validation
Validate expectations across multiple batches (e.g., compare today's data to yesterday's).
import great_expectations as gx
context = gx.get_context()
asset = context.data_sources.get("warehouse").get_asset("orders")
# Define batch definitions for different partitions
today_batch = asset.add_batch_definition_daily("today", column="created_at")
yesterday_batch = asset.add_batch_definition_daily("yesterday", column="created_at")
# Run same suite against both batches
suite = context.suites.get("orders_quality")
for batch_def in [today_batch, yesterday_batch]:
validation = gx.ValidationDefinition(
name=f"orders_{batch_def.name}",
data=batch_def,
suite=suite,
)
result = validation.run()
print(f"{batch_def.name}: {'PASS' if result.success else 'FAIL'}")Conditional expectations
Apply expectations only to subsets of data using row conditions.
import great_expectations as gx
suite = context.suites.get("orders_quality")
# Only check refund_amount for cancelled orders
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(
column="refund_amount",
row_condition='status == "cancelled"',
condition_parser="pandas",
)
)
# Only check shipping_address for non-digital orders
suite.add_expectation(
gx.expectations.ExpectColumnValuesToNotBeNull(
column="shipping_address",
row_condition='product_type != "digital"',
condition_parser="pandas",
)
)Integration with Airflow
Use the GreatExpectationsOperator to run checkpoints as Airflow tasks.
from airflow import DAG
from airflow.utils.dates import days_ago
from great_expectations_provider.operators.great_expectations import (
GreatExpectationsOperator,
)
with DAG(
dag_id="orders_quality_check",
start_date=days_ago(1),
schedule_interval="@daily",
catchup=False,
) as dag:
validate_orders = GreatExpectationsOperator(
task_id="validate_orders",
data_context_root_dir="/opt/airflow/great_expectations",
checkpoint_name="orders_checkpoint",
fail_task_on_validation_failure=True,
)The Airflow provider is
apache-airflow-providers-great-expectations. Install it separately:pip install apache-airflow-providers-great-expectations.
lineage-tools.md
Lineage Tools Comparison and Integration
Tool comparison
| Tool | Type | Column-level lineage | Hosted option | Best for |
|---|---|---|---|---|
| OpenLineage | Open standard/protocol | Yes (via facets) | No (protocol only) | Teams wanting vendor-neutral lineage events |
| DataHub | Metadata platform | Yes | Acryl Cloud | Full metadata management + lineage |
| dbt lineage | Built into dbt | Yes (via manifest) | dbt Cloud | Teams already using dbt |
| Atlan | Data catalog | Yes | SaaS only | Enterprise data governance |
| Apache Atlas | Metadata platform | Limited | No | Hadoop/Hive-centric environments |
| Marquez | Lineage server | Yes (via OpenLineage) | No | OpenLineage backend/UI |
OpenLineage
OpenLineage is an open standard for lineage event collection. It defines a JSON schema for run events that capture inputs, outputs, and job metadata. It is not a platform - it is a protocol that platforms consume.
Core event model
RunEvent {
eventType: START | RUNNING | COMPLETE | FAIL | ABORT
eventTime: ISO 8601 timestamp
run: { runId: UUID }
job: { namespace: string, name: string }
inputs: [ InputDataset { namespace, name, facets } ]
outputs: [ OutputDataset { namespace, name, facets } ]
}Native integrations
These tools emit OpenLineage events automatically when configured:
| Tool | Configuration |
|---|---|
| Apache Airflow | pip install openlineage-airflow, set OPENLINEAGE_URL env var |
| Apache Spark | Add openlineage-spark JAR to Spark conf, set spark.openlineage.transport.url |
| dbt | dbt 1.4+ emits OpenLineage events natively when OPENLINEAGE_URL is set |
| Flink | openlineage-flink plugin, configure via flink-conf.yaml |
| Great Expectations | OpenLineage action in checkpoint configuration |
Setting up Marquez as an OpenLineage backend
Marquez is the reference implementation for collecting and serving OpenLineage events.
# Docker Compose quickstart
git clone https://github.com/MarquezProject/marquez.git
cd marquez
docker compose up -d
# Marquez API is at http://localhost:5000
# Marquez UI is at http://localhost:3000# Point your OpenLineage clients at Marquez
from openlineage.client import OpenLineageClient
client = OpenLineageClient(url="http://localhost:5000")Column-level lineage with facets
from openlineage.client.facet_v2 import (
column_lineage_dataset_facet,
)
output_facets = {
"columnLineage": column_lineage_dataset_facet.ColumnLineageDatasetFacet(
fields={
"total_revenue": column_lineage_dataset_facet.ColumnLineageDatasetFacetFieldsAdditional(
inputFields=[
column_lineage_dataset_facet.InputField(
namespace="warehouse",
name="raw.orders",
field="amount",
),
column_lineage_dataset_facet.InputField(
namespace="warehouse",
name="raw.orders",
field="discount",
),
],
transformationDescription="SUM(amount - discount)",
transformationType="AGGREGATION",
)
}
)
}dbt lineage
dbt generates lineage automatically from ref() and source() calls in models.
Accessing lineage from the manifest
import json
with open("target/manifest.json") as f:
manifest = json.load(f)
# Get all parents (upstream dependencies) of a model
model_key = "model.my_project.orders_summary"
node = manifest["nodes"][model_key]
parents = node["depends_on"]["nodes"]
print(f"Upstream dependencies: {parents}")
# Get all children (downstream dependents) of a model
children = manifest["child_map"].get(model_key, [])
print(f"Downstream dependents: {children}")Column-level lineage in dbt
dbt 1.6+ supports column-level lineage in dbt Cloud. For open-source dbt, use the
dbt-column-lineage package or parse SQL with sqlglot:
import sqlglot
from sqlglot.lineage import lineage
# Parse a dbt model's compiled SQL to extract column lineage
sql = """
SELECT
o.order_id,
o.customer_id,
SUM(oi.quantity * oi.unit_price) AS total_amount
FROM {{ ref('orders') }} o
JOIN {{ ref('order_items') }} oi ON o.order_id = oi.order_id
GROUP BY o.order_id, o.customer_id
"""
# After compiling (replacing refs with actual table names):
compiled_sql = sql.replace("{{ ref('orders') }}", "curated.orders")
compiled_sql = compiled_sql.replace("{{ ref('order_items') }}", "curated.order_items")
result = lineage("total_amount", compiled_sql, dialect="postgres")
for node in result.walk():
print(f" {node.name} <- {node.source.sql()}")DataHub
DataHub is a full metadata platform that includes lineage, data discovery, governance, and observability.
Emitting lineage to DataHub
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
DatasetLineageTypeClass,
UpstreamClass,
UpstreamLineageClass,
)
emitter = DatahubRestEmitter(gms_server="http://datahub-gms:8080")
upstream_lineage = UpstreamLineageClass(
upstreams=[
UpstreamClass(
dataset="urn:li:dataset:(urn:li:dataPlatform:postgres,raw.orders,PROD)",
type=DatasetLineageTypeClass.TRANSFORMED,
),
UpstreamClass(
dataset="urn:li:dataset:(urn:li:dataPlatform:postgres,raw.customers,PROD)",
type=DatasetLineageTypeClass.TRANSFORMED,
),
]
)
emitter.emit_mce(
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:postgres,curated.order_summary,PROD)",
"aspects": [upstream_lineage],
}
}
}
)DataHub + Airflow integration
pip install acryl-datahub-airflow-plugin
# In airflow.cfg or env vars:
export AIRFLOW__DATAHUB__DATAHUB_REST_CONN_ID=datahub_rest_defaultDataHub's Airflow plugin automatically captures lineage from PostgresOperator,
BigQueryOperator, and other SQL-based operators by parsing the SQL.
Choosing a lineage tool
| If you... | Use |
|---|---|
| Already use dbt | Start with dbt's built-in lineage, add OpenLineage for non-dbt jobs |
| Need a full metadata platform | DataHub (open source) or Atlan (SaaS) |
| Want vendor-neutral events | OpenLineage + Marquez |
| Need column-level lineage across tools | DataHub or Atlan (most complete) |
| Have a Hadoop/Hive stack | Apache Atlas |
Start with the simplest option that covers your current tools. Lineage is most valuable when it is complete - a partial lineage graph is worse than no lineage because it gives false confidence about impact analysis.
Frequently Asked Questions
What is data-quality?
Use this skill when implementing data validation, data quality monitoring, data lineage tracking, data contracts, or Great Expectations test suites. Triggers on schema validation, data profiling, freshness checks, row-count anomalies, column drift, expectation suites, contract testing between producers and consumers, lineage graphs, data observability, and any task requiring data integrity enforcement across pipelines.
How do I install data-quality?
Run npx skills add AbsolutelySkilled/AbsolutelySkilled --skill data-quality in your terminal. The skill will be immediately available in your AI coding agent.
What AI agents support data-quality?
data-quality works with claude-code, gemini-cli, openai-codex. Install it once and use it across any supported AI coding agent.