Schemas, and Composable DataFrame contractsIn this tutorial, we demonstrate how to use a robust, production-grade data validation pipeline Pandera With typed dataframe model. We begin by simulating realistic, incomplete transaction data and progressively enforce strict schema constraints, column-level rules, and cross-column business logic using declarative checks. We show how lazy validation helps us surface multiple data quality issues at once, how invalid records can be isolated without breaking pipelines, and how schema enforcement can be applied directly to function boundaries to guarantee correctness as data flows through transformations. check it out full code here.
!pip -q install "pandera>=0.18" pandas numpy polars pyarrow hypothesis
import json
import numpy as np
import pandas as pd
import pandera as pa
from pandera.errors import SchemaError, SchemaErrors
from pandera.typing import Series, DataFrame
print("pandera version:", pa.__version__)
print("pandas version:", pd.__version__)
We have set up the execution environment by installing Pandera and its dependencies and importing all the required libraries. We verify library versions to ensure reproducibility and compatibility. This establishes a clean base for implementing typed data validation throughout the tutorial. check it out full code here.
rng = np.random.default_rng(42)
def make_raw_orders(n=250):
countries = np.array(("CA", "US", "MX"))
channels = np.array(("web", "mobile", "partner"))
raw = pd.DataFrame(
{
"order_id": rng.integers(1, 120, size=n),
"customer_id": rng.integers(1, 90, size=n),
"email": rng.choice(
("(email protected)", "(email protected)", "bad_email", None),
size=n,
p=(0.45, 0.45, 0.07, 0.03),
),
"country": rng.choice(countries, size=n, p=(0.5, 0.45, 0.05)),
"channel": rng.choice(channels, size=n, p=(0.55, 0.35, 0.10)),
"items": rng.integers(0, 8, size=n),
"unit_price": rng.normal(loc=35, scale=20, size=n),
"discount": rng.choice((0.0, 0.05, 0.10, 0.20, 0.50), size=n, p=(0.55, 0.15, 0.15, 0.12, 0.03)),
"ordered_at": pd.to_datetime("2025-01-01") + pd.to_timedelta(rng.integers(0, 120, size=n), unit="D"),
}
)
raw.loc(rng.choice(n, size=8, replace=False), "unit_price") = -abs(raw("unit_price").iloc(0))
raw.loc(rng.choice(n, size=6, replace=False), "items") = 0
raw.loc(rng.choice(n, size=5, replace=False), "discount") = 0.9
raw.loc(rng.choice(n, size=4, replace=False), "country") = "ZZ"
raw.loc(rng.choice(n, size=3, replace=False), "channel") = "unknown"
raw.loc(rng.choice(n, size=6, replace=False), "unit_price") = raw("unit_price").iloc(:6).round(2).astype(str).values
return raw
raw_orders = make_raw_orders(250)
display(raw_orders.head(10))
We create a realistic transactional dataset that intentionally includes common data quality issues. We simulate invalid values, inconsistent types, and unexpected ranges to reflect real-world ingestion scenarios. This allows us to meaningfully test and demonstrate the effectiveness of schema-based validation. check it out full code here.
EMAIL_RE = r"^(A-Za-z0-9._%+-)+@(A-Za-z0-9.-)+.(A-Za-z){2,}$"
class Orders(pa.DataFrameModel):
order_id: Series(int) = pa.Field(ge=1)
customer_id: Series(int) = pa.Field(ge=1)
email: Series(object) = pa.Field(nullable=True)
country: Series(str) = pa.Field(isin=("CA", "US", "MX"))
channel: Series(str) = pa.Field(isin=("web", "mobile", "partner"))
items: Series(int) = pa.Field(ge=1, le=50)
unit_price: Series(float) = pa.Field(gt=0)
discount: Series(float) = pa.Field(ge=0.0, le=0.8)
ordered_at: Series(pd.Timestamp)
class Config:
coerce = True
strict = True
ordered = False
@pa.check("email")
def email_valid(cls, s: pd.Series) -> pd.Series:
return s.isna() | s.astype(str).str.match(EMAIL_RE)
@pa.dataframe_check
def total_value_reasonable(cls, df: pd.DataFrame) -> pd.Series:
total = df("items") * df("unit_price") * (1.0 - df("discount"))
return total.between(0.01, 5000.0)
@pa.dataframe_check
def channel_country_rule(cls, df: pd.DataFrame) -> pd.Series:
ok = ~((df("channel") == "partner") & (df("country") == "MX"))
return ok
We define a strict Pandera DataFrameModel that captures structural and business-level constraints. We apply column-level rules, regex-based validation, and dataframe-wide checks to declaratively encode domain logic. check it out full code here.
try:
validated = Orders.validate(raw_orders, lazy=True)
print(validated.dtypes)
except SchemaErrors as exc:
display(exc.failure_cases.head(25))
err_json = exc.failure_cases.to_dict(orient="records")
print(json.dumps(err_json(:5), indent=2, default=str))
We validate the raw dataset using lazy evaluation to uncover multiple violations in a single pass. We inspect structured failure cases to understand where and why the data breaks schema rules. This helps us address data quality issues without disrupting the entire pipeline. check it out full code here.
def split_clean_quarantine(df: pd.DataFrame):
try:
clean = Orders.validate(df, lazy=False)
return clean, df.iloc(0:0).copy()
except SchemaError:
pass
try:
Orders.validate(df, lazy=True)
return df.copy(), df.iloc(0:0).copy()
except SchemaErrors as exc:
bad_idx = sorted(set(exc.failure_cases("index").dropna().astype(int).tolist()))
quarantine = df.loc(bad_idx).copy()
clean = df.drop(index=bad_idx).copy()
return Orders.validate(clean, lazy=False), quarantine
clean_orders, quarantine_orders = split_clean_quarantine(raw_orders)
display(quarantine_orders.head(10))
display(clean_orders.head(10))
@pa.check_types
def enrich_orders(df: DataFrame(Orders)) -> DataFrame(Orders):
out = df.copy()
out("unit_price") = out("unit_price").round(2)
out("discount") = out("discount").round(2)
return out
enriched = enrich_orders(clean_orders)
display(enriched.head(5))
We separate valid records from invalid records by separating the rows that fail the schema check. We then enforce schema guarantees on the function boundaries to ensure that only trusted data is changed. This pattern enables secure data enrichment while preventing silent corruption. check it out full code here.
class EnrichedOrders(Orders):
total_value: Series(float) = pa.Field(gt=0)
class Config:
coerce = True
strict = True
@pa.dataframe_check
def totals_consistent(cls, df: pd.DataFrame) -> pd.Series:
total = df("items") * df("unit_price") * (1.0 - df("discount"))
return (df("total_value") - total).abs() <= 1e-6
@pa.check_types
def add_totals(df: DataFrame(Orders)) -> DataFrame(EnrichedOrders):
out = df.copy()
out("total_value") = out("items") * out("unit_price") * (1.0 - out("discount"))
return EnrichedOrders.validate(out, lazy=False)
enriched2 = add_totals(clean_orders)
display(enriched2.head(5))
We extend the base schema with a derived column and validate cross-column consistency using a composable schema. We verify that the calculated values obey strict numerical invariance after transformation. This demonstrates how Pandera supports secure feature engineering with enforceable guarantees.
In conclusion, we have established a disciplined approach to data validation that treats schemas as first-class contracts rather than optional security measures. We demonstrated how the schema structure enables us to safely extend datasets with derived features while preserving invariants, and how Pandera integrates seamlessly into real analytical and data-engineering workflows. Through this tutorial, we ensured that each transformation operated on reliable data, enabling us to build pipelines that are transparent, debuggable, and flexible in real-world environments.
check it out full code here. Also, feel free to follow us Twitter And don’t forget to join us 100k+ ml subreddit and subscribe our newsletter. wait! Are you on Telegram? Now you can also connect with us on Telegram.
