In this tutorial, we build a production-ready agentive workflow that prioritizes reliability over best-effort generation by enforcing strict, typed output at every step. We use PydanticAI to define explicit response schema, wire it into tools through dependency injection, and ensure that the agent can safely interact with external systems such as databases without disrupting execution. By running everything in a notebook-friendly, async-first setup, we demonstrate how to move beyond the fragile chatbot pattern toward robust agentic systems suitable for real enterprise workflows.
!pip -q install "pydantic-ai-slim(openai)" pydantic
import os, json, sqlite3
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Literal, Optional, List
from pydantic import BaseModel, Field, field_validator
from pydantic_ai import Agent, RunContext, ModelRetry
if not os.environ.get("OPENAI_API_KEY"):
try:
from google.colab import userdata
os.environ("OPENAI_API_KEY") = (userdata.get("OPENAI_API_KEY") or "").strip()
except Exception:
pass
if not os.environ.get("OPENAI_API_KEY"):
import getpass
os.environ("OPENAI_API_KEY") = getpass.getpass("Paste your OPENAI_API_KEY: ").strip()
assert os.environ.get("OPENAI_API_KEY"), "OPENAI_API_KEY is required."
We set up the execution environment and ensure that all necessary libraries are available to run the agent correctly. We securely load OpenAI API keys in a Colab-friendly way so that the tutorial works without manual configuration changes. We also import all the core dependencies that will be shared across schema, tools, and agent logic.
Priority = Literal("low", "medium", "high", "critical")
ActionType = Literal("create_ticket", "update_ticket", "query_ticket", "list_open_tickets", "no_action")
Confidence = Literal("low", "medium", "high")
class TicketDraft(BaseModel):
title: str = Field(..., min_length=8, max_length=120)
customer: str = Field(..., min_length=2, max_length=60)
priority: Priority
category: Literal("billing", "bug", "feature_request", "security", "account", "other")
description: str = Field(..., min_length=20, max_length=1000)
expected_outcome: str = Field(..., min_length=10, max_length=250)
class AgentDecision(BaseModel):
action: ActionType
reason: str = Field(..., min_length=20, max_length=400)
confidence: Confidence
ticket: Optional(TicketDraft) = None
ticket_id: Optional(int) = None
follow_up_questions: List(str) = Field(default_factory=list, max_length=5)
@field_validator("follow_up_questions")
@classmethod
def short_questions(cls, v):
for q in v:
if len(q) > 140:
raise ValueError("Each follow-up question must be <= 140 characters.")
return v
We define strict data models that act as contracts between the agent and the rest of the system. We use typed fields and validation rules to guarantee that each agent response follows a predictable structure. By enforcing these schemas, we prevent malformed output from spreading silently through the workflow.
@dataclass
class SupportDeps:
db: sqlite3.Connection
tenant: str
policy: dict
def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def init_db() -> sqlite3.Connection:
conn = sqlite3.connect(":memory:", check_same_thread=False)
conn.execute("""
CREATE TABLE tickets (
id INTEGER PRIMARY KEY AUTOINCREMENT,
tenant TEXT NOT NULL,
title TEXT NOT NULL,
customer TEXT NOT NULL,
priority TEXT NOT NULL,
category TEXT NOT NULL,
description TEXT NOT NULL,
expected_outcome TEXT NOT NULL,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
""")
conn.commit()
return conn
def seed_ticket(db: sqlite3.Connection, tenant: str, ticket: TicketDraft, status: str = "open") -> int:
now = utc_now_iso()
cur = db.execute(
"""
INSERT INTO tickets
(tenant, title, customer, priority, category, description, expected_outcome, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
tenant,
ticket.title,
ticket.customer,
ticket.priority,
ticket.category,
ticket.description,
ticket.expected_outcome,
status,
now,
now,
),
)
db.commit()
return int(cur.lastrowid)
We build a dependency layer and initialize a lightweight SQLite database for persistence. We model real-world runtime dependencies, such as database connections and tenant policies, and make them injectable into the agent. We also define helper functions that safely insert and manage ticket data during execution.
def build_agent(model_name: str) -> Agent(SupportDeps, AgentDecision):
agent = Agent(
f"openai:{model_name}",
output_type=AgentDecision,
output_retries=2,
instructions=(
"You are a production support triage agent.\n"
"Return an output that matches the AgentDecision schema.\n"
"Use tools when you need DB state.\n"
"Never invent ticket IDs.\n"
"If the user intent is unclear, ask concise follow-up questions.\n"
),
)
@agent.tool
def create_ticket(ctx: RunContext(SupportDeps), ticket: TicketDraft) -> int:
deps = ctx.deps
if ticket.priority in ("critical", "high") and deps.policy.get("require_security_phrase_for_critical", False):
if ticket.category == "security" and "incident" not in ticket.description.lower():
raise ModelRetry("For security high/critical, include the word 'incident' in description and retry.")
return seed_ticket(deps.db, deps.tenant, ticket, status="open")
@agent.tool
def update_ticket_status(
ctx: RunContext(SupportDeps),
ticket_id: int,
status: Literal("open", "in_progress", "resolved", "closed"),
) -> dict:
deps = ctx.deps
now = utc_now_iso()
cur = deps.db.execute("SELECT id FROM tickets WHERE tenant=? AND id=?", (deps.tenant, ticket_id))
if not cur.fetchone():
raise ModelRetry(f"Ticket {ticket_id} not found for this tenant. Ask for the correct ticket_id.")
deps.db.execute(
"UPDATE tickets SET status=?, updated_at=? WHERE tenant=? AND id=?",
(status, now, deps.tenant, ticket_id),
)
deps.db.commit()
return {"ticket_id": ticket_id, "status": status, "updated_at": now}
@agent.tool
def query_ticket(ctx: RunContext(SupportDeps), ticket_id: int) -> dict:
deps = ctx.deps
cur = deps.db.execute(
"""
SELECT id, title, customer, priority, category, status, created_at, updated_at
FROM tickets WHERE tenant=? AND id=?
""",
(deps.tenant, ticket_id),
)
row = cur.fetchone()
if not row:
raise ModelRetry(f"Ticket {ticket_id} not found. Ask the user for a valid ticket_id.")
keys = ("id", "title", "customer", "priority", "category", "status", "created_at", "updated_at")
return dict(zip(keys, row))
@agent.tool
def list_open_tickets(ctx: RunContext(SupportDeps), limit: int = 5) -> list:
deps = ctx.deps
limit = max(1, min(int(limit), 20))
cur = deps.db.execute(
"""
SELECT id, title, priority, category, status, updated_at
FROM tickets
WHERE tenant=? AND status IN ('open','in_progress')
ORDER BY updated_at DESC
LIMIT ?
""",
(deps.tenant, limit),
)
rows = cur.fetchall()
return (
{"id": r(0), "title": r(1), "priority": r(2), "category": r(3), "status": r(4), "updated_at": r(5)}
for r in rows
)
@agent.output_validator
def validate_decision(ctx: RunContext(SupportDeps), out: AgentDecision) -> AgentDecision:
deps = ctx.deps
if out.action == "create_ticket" and out.ticket is None:
raise ModelRetry("You chose create_ticket but did not provide ticket. Provide ticket fields and retry.")
if out.action in ("update_ticket", "query_ticket") and out.ticket_id is None:
raise ModelRetry("You chose update/query but did not provide ticket_id. Ask for ticket_id and retry.")
if out.ticket and out.ticket.priority == "critical" and not deps.policy.get("allow_critical", True):
raise ModelRetry("This tenant does not allow 'critical'. Downgrade to 'high' and retry.")
return out
return agent
It contains the core agent logic for assembling a model-agnostic PedanticAI agent. We register typed tools for creating, querying, updating, and listing tickets, allowing the agent to interact with external state in a controlled manner. We also implement output validation so that the agent can self-correct whenever its decisions violate business rules.
db = init_db()
deps = SupportDeps(
db=db,
tenant="acme_corp",
policy={"allow_critical": True, "require_security_phrase_for_critical": True},
)
seed_ticket(
db,
deps.tenant,
TicketDraft(
title="Double-charged on invoice 8831",
customer="Riya",
priority="high",
category="billing",
description="Customer reports they were billed twice for invoice 8831 and wants a refund and confirmation email.",
expected_outcome="Issue a refund and confirm resolution to customer.",
),
)
seed_ticket(
db,
deps.tenant,
TicketDraft(
title="App crashes on login after update",
customer="Sam",
priority="high",
category="bug",
description="After latest update, the app crashes immediately on login. Reproducible on two devices; needs investigation.",
expected_outcome="Provide a fix or workaround and restore successful logins.",
),
)
agent = build_agent("gpt-4o-mini")
async def run_case(prompt: str):
res = await agent.run(prompt, deps=deps)
out = res.output
print(json.dumps(out.model_dump(), indent=2))
return out
case_a = await run_case(
"We suspect account takeover: multiple password reset emails and unauthorized logins. "
"Customer=Leila. Priority=critical. Open a security ticket."
)
case_b = await run_case("List our open tickets and summarize what to tackle first.")
case_c = await run_case("What is the status of ticket 1? If it's open, move it to in_progress.")
agent_alt = build_agent("gpt-4o")
alt_res = await agent_alt.run(
"Create a feature request ticket: customer=Noah wants 'export to CSV' in analytics dashboard; priority=medium.",
deps=deps,
)
print(json.dumps(alt_res.output.model_dump(), indent=2))
We tie everything together by seeding the initial data and running the agent asynchronously in a notebook-safe manner. We execute several real-world scenarios to show how the agent reasons, calls tools, and returns schema-validated output. We also demonstrate how easily we can swap out the underlying models while maintaining the same workflow and guarantees.
In conclusion, we showed how a type-safe agent can reason, call tools, validate its own outputs, and recover from errors without manual intervention. We kept the logic model-agnostic, allowing us to swap out the underlying LLM while preserving the same schema and tools, which is important for long-term maintainability. Overall, we demonstrated how the combination of strict schema enforcement, dependency injection, and async execution closes the reliability gap in agentic AI and provides a solid foundation for building trustworthy production systems.
check it out full code here. Also, feel free to follow us Twitter And don’t forget to join us 100k+ ml subreddit and subscribe our newsletter. wait! Are you on Telegram? Now you can also connect with us on Telegram.

