How to build a production-grade data validation pipeline using Pandera, typed schema, and composable dataframe contracts

by
0 comments
How to build a production-grade data validation pipeline using Pandera, typed schema, and composable dataframe contracts

Schemas, and Composable DataFrame contractsIn this tutorial, we demonstrate how to use a robust, production-grade data validation pipeline Pandera With typed dataframe model. We begin by simulating realistic, incomplete transaction data and progressively enforce strict schema constraints, column-level rules, and cross-column business logic using declarative checks. We show how lazy validation helps us surface multiple data quality issues at once, how invalid records can be isolated without breaking pipelines, and how schema enforcement can be applied directly to function boundaries to guarantee correctness as data flows through transformations. check it out full code here.

!pip -q install "pandera>=0.18" pandas numpy polars pyarrow hypothesis


import json
import numpy as np
import pandas as pd
import pandera as pa
from pandera.errors import SchemaError, SchemaErrors
from pandera.typing import Series, DataFrame


print("pandera version:", pa.__version__)
print("pandas  version:", pd.__version__)

We have set up the execution environment by installing Pandera and its dependencies and importing all the required libraries. We verify library versions to ensure reproducibility and compatibility. This establishes a clean base for implementing typed data validation throughout the tutorial. check it out full code here.

rng = np.random.default_rng(42)


def make_raw_orders(n=250):
   countries = np.array(("CA", "US", "MX"))
   channels = np.array(("web", "mobile", "partner"))
   raw = pd.DataFrame(
       {
           "order_id": rng.integers(1, 120, size=n),
           "customer_id": rng.integers(1, 90, size=n),
           "email": rng.choice(
               ("(email protected)", "(email protected)", "bad_email", None),
               size=n,
               p=(0.45, 0.45, 0.07, 0.03),
           ),
           "country": rng.choice(countries, size=n, p=(0.5, 0.45, 0.05)),
           "channel": rng.choice(channels, size=n, p=(0.55, 0.35, 0.10)),
           "items": rng.integers(0, 8, size=n),
           "unit_price": rng.normal(loc=35, scale=20, size=n),
           "discount": rng.choice((0.0, 0.05, 0.10, 0.20, 0.50), size=n, p=(0.55, 0.15, 0.15, 0.12, 0.03)),
           "ordered_at": pd.to_datetime("2025-01-01") + pd.to_timedelta(rng.integers(0, 120, size=n), unit="D"),
       }
   )


   raw.loc(rng.choice(n, size=8, replace=False), "unit_price") = -abs(raw("unit_price").iloc(0))
   raw.loc(rng.choice(n, size=6, replace=False), "items") = 0
   raw.loc(rng.choice(n, size=5, replace=False), "discount") = 0.9
   raw.loc(rng.choice(n, size=4, replace=False), "country") = "ZZ"
   raw.loc(rng.choice(n, size=3, replace=False), "channel") = "unknown"
   raw.loc(rng.choice(n, size=6, replace=False), "unit_price") = raw("unit_price").iloc(:6).round(2).astype(str).values


   return raw


raw_orders = make_raw_orders(250)
display(raw_orders.head(10))

We create a realistic transactional dataset that intentionally includes common data quality issues. We simulate invalid values, inconsistent types, and unexpected ranges to reflect real-world ingestion scenarios. This allows us to meaningfully test and demonstrate the effectiveness of schema-based validation. check it out full code here.

EMAIL_RE = r"^(A-Za-z0-9._%+-)+@(A-Za-z0-9.-)+.(A-Za-z){2,}$"


class Orders(pa.DataFrameModel):
   order_id: Series(int) = pa.Field(ge=1)
   customer_id: Series(int) = pa.Field(ge=1)
   email: Series(object) = pa.Field(nullable=True)
   country: Series(str) = pa.Field(isin=("CA", "US", "MX"))
   channel: Series(str) = pa.Field(isin=("web", "mobile", "partner"))
   items: Series(int) = pa.Field(ge=1, le=50)
   unit_price: Series(float) = pa.Field(gt=0)
   discount: Series(float) = pa.Field(ge=0.0, le=0.8)
   ordered_at: Series(pd.Timestamp)


   class Config:
       coerce = True
       strict = True
       ordered = False


   @pa.check("email")
   def email_valid(cls, s: pd.Series) -> pd.Series:
       return s.isna() | s.astype(str).str.match(EMAIL_RE)


   @pa.dataframe_check
   def total_value_reasonable(cls, df: pd.DataFrame) -> pd.Series:
       total = df("items") * df("unit_price") * (1.0 - df("discount"))
       return total.between(0.01, 5000.0)


   @pa.dataframe_check
   def channel_country_rule(cls, df: pd.DataFrame) -> pd.Series:
       ok = ~((df("channel") == "partner") & (df("country") == "MX"))
       return ok

We define a strict Pandera DataFrameModel that captures structural and business-level constraints. We apply column-level rules, regex-based validation, and dataframe-wide checks to declaratively encode domain logic. check it out full code here.

try:
   validated = Orders.validate(raw_orders, lazy=True)
   print(validated.dtypes)
except SchemaErrors as exc:
   display(exc.failure_cases.head(25))
   err_json = exc.failure_cases.to_dict(orient="records")
   print(json.dumps(err_json(:5), indent=2, default=str))

We validate the raw dataset using lazy evaluation to uncover multiple violations in a single pass. We inspect structured failure cases to understand where and why the data breaks schema rules. This helps us address data quality issues without disrupting the entire pipeline. check it out full code here.

def split_clean_quarantine(df: pd.DataFrame):
   try:
       clean = Orders.validate(df, lazy=False)
       return clean, df.iloc(0:0).copy()
   except SchemaError:
       pass


   try:
       Orders.validate(df, lazy=True)
       return df.copy(), df.iloc(0:0).copy()
   except SchemaErrors as exc:
       bad_idx = sorted(set(exc.failure_cases("index").dropna().astype(int).tolist()))
       quarantine = df.loc(bad_idx).copy()
       clean = df.drop(index=bad_idx).copy()
       return Orders.validate(clean, lazy=False), quarantine


clean_orders, quarantine_orders = split_clean_quarantine(raw_orders)
display(quarantine_orders.head(10))
display(clean_orders.head(10))


@pa.check_types
def enrich_orders(df: DataFrame(Orders)) -> DataFrame(Orders):
   out = df.copy()
   out("unit_price") = out("unit_price").round(2)
   out("discount") = out("discount").round(2)
   return out


enriched = enrich_orders(clean_orders)
display(enriched.head(5))

We separate valid records from invalid records by separating the rows that fail the schema check. We then enforce schema guarantees on the function boundaries to ensure that only trusted data is changed. This pattern enables secure data enrichment while preventing silent corruption. check it out full code here.

class EnrichedOrders(Orders):
   total_value: Series(float) = pa.Field(gt=0)


   class Config:
       coerce = True
       strict = True


   @pa.dataframe_check
   def totals_consistent(cls, df: pd.DataFrame) -> pd.Series:
       total = df("items") * df("unit_price") * (1.0 - df("discount"))
       return (df("total_value") - total).abs() <= 1e-6


@pa.check_types
def add_totals(df: DataFrame(Orders)) -> DataFrame(EnrichedOrders):
   out = df.copy()
   out("total_value") = out("items") * out("unit_price") * (1.0 - out("discount"))
   return EnrichedOrders.validate(out, lazy=False)


enriched2 = add_totals(clean_orders)
display(enriched2.head(5))

We extend the base schema with a derived column and validate cross-column consistency using a composable schema. We verify that the calculated values ​​obey strict numerical invariance after transformation. This demonstrates how Pandera supports secure feature engineering with enforceable guarantees.

In conclusion, we have established a disciplined approach to data validation that treats schemas as first-class contracts rather than optional security measures. We demonstrated how the schema structure enables us to safely extend datasets with derived features while preserving invariants, and how Pandera integrates seamlessly into real analytical and data-engineering workflows. Through this tutorial, we ensured that each transformation operated on reliable data, enabling us to build pipelines that are transparent, debuggable, and flexible in real-world environments.


check it out full code here. Also, feel free to follow us Twitter And don’t forget to join us 100k+ ml subreddit and subscribe our newsletter. wait! Are you on Telegram? Now you can also connect with us on Telegram.


Related Articles

Leave a Comment