How to build production-grade agent workflows with Graphbit using deterministic tools, validated execution graphs, and optional LLM orchestration

by
0 comments
How to build production-grade agent workflows with Graphbit using deterministic tools, validated execution graphs, and optional LLM orchestration

In this tutorial, we build an end-to-end, production-style agentive workflow using graphbit It demonstrates how graph-structured execution, tool calling, and alternative LLM-driven agents can coexist in the same system. We begin by initializing and inspecting the GraphBit runtime, then defining a realistic customer-support ticket domain with typed data structures and deterministic, offline-executable tools. We show how these tools can be built into a reliable, rule-based pipeline for classification, routing, and response formatting, and then the same logic can be advanced into a validated GraphBit workflow in which agent nodes orchestrate tool usage through a directed graph. Throughout the tutorial, we keep the system running in offline mode while enabling seamless propagation to online execution by simply providing the LLM configuration, demonstrating how GraphBit supports the gradual adoption of Agentic Intelligence without sacrificing reproducibility or operational control. check it out full code here,

!pip -q install graphbit rich pydantic numpy


import os
import time
import json
import random
from dataclasses import dataclass
from typing import Dict, Any, List, Optional
import numpy as np
from rich import print as rprint
from rich.panel import Panel
from rich.table import Table

We start by installing all the required dependencies and importing the core Python, numerical, and visualization libraries required for the tutorial. We set up the runtime environment so that the notebook remains self-contained and reproducible on Google Colab. check it out full code here,

from graphbit import init, shutdown, configure_runtime, get_system_info, health_check, version
from graphbit import Workflow, Node, Executor, LlmConfig
from graphbit import tool, ToolExecutor, ExecutorConfig
from graphbit import get_tool_registry, clear_tools


configure_runtime(worker_threads=4, max_blocking_threads=8, thread_stack_size_mb=2)
init(log_level="warn", enable_tracing=False, debug=False)


info = get_system_info()
health = health_check()


sys_table = Table(title="System Info / Health")
sys_table.add_column("Key", style="bold")
sys_table.add_column("Value")
for k in ("version", "python_binding_version", "cpu_count", "runtime_worker_threads", "runtime_initialized", "build_target", "build_profile"):
   sys_table.add_row(k, str(info.get(k)))
sys_table.add_row("graphbit_version()", str(version()))
sys_table.add_row("overall_healthy", str(health.get("overall_healthy")))
rprint(sys_table)

We initialize the GraphBit runtime and explicitly configure its execution parameters to control threading and resource usage. We then query system metadata and perform health checks to verify that the runtime has been initialized correctly. check it out full code here,

@dataclass
class Ticket:
   ticket_id: str
   user_id: str
   text: str
   created_at: float


def make_tickets(n: int = 10) -> List(Ticket):
   seeds = (
       "My card payment failed twice, what should I do?",
       "I want to cancel my subscription immediately.",
       "Your app crashes when I open the dashboard.",
       "Please update my email address on the account.",
       "Refund not received after 7 days.",
       "My delivery is delayed and tracking is stuck.",
       "I suspect fraudulent activity on my account.",
       "How can I change my billing cycle date?",
       "The website is very slow and times out.",
       "I forgot my password and cannot login.",
       "Chargeback process details please.",
       "Need invoice for last month’s payment."
   )
   random.shuffle(seeds)
   out = ()
   for i in range(n):
       out.append(
           Ticket(
               ticket_id=f"T-{1000+i}",
               user_id=f"U-{random.randint(100,999)}",
               text=seeds(i % len(seeds)),
               created_at=time.time() - random.randint(0, 7 * 24 * 3600),
           )
       )
   return out


tickets = make_tickets(10)
rprint(Panel.fit("n".join((f"- {t.ticket_id}: {t.text}" for t in tickets)), title="Sample Tickets"))

We define a strongly typed data model for support tickets and generate a synthetic dataset that simulates realistic customer problems. We create tickets with timestamps and identifiers to reflect production inputs. This dataset serves as a shared input to both offline and agent-driven pipelines. check it out full code here,

clear_tools()


@tool(_description="Classify a support ticket into a coarse category.")
def classify_ticket(text: str) -> Dict(str, Any):
   t = text.lower()
   if "fraud" in t or "fraudulent" in t:
       return {"category": "fraud", "priority": "p0"}
   if "cancel" in t:
       return {"category": "cancellation", "priority": "p1"}
   if "refund" in t or "chargeback" in t:
       return {"category": "refunds", "priority": "p1"}
   if "password" in t or "login" in t:
       return {"category": "account_access", "priority": "p2"}
   if "crash" in t or "slow" in t or "timeout" in t:
       return {"category": "bug", "priority": "p2"}
   if "payment" in t or "billing" in t or "invoice" in t:
       return {"category": "billing", "priority": "p2"}
   if "delivery" in t or "tracking" in t:
       return {"category": "delivery", "priority": "p3"}
   return {"category": "general", "priority": "p3"}


@tool(_description="Route a ticket to a queue (returns queue id and SLA hours).")
def route_ticket(category: str, priority: str) -> Dict(str, Any):
   queue_map = {
       "fraud": ("risk_ops", 2),
       "cancellation": ("retention", 8),
       "refunds": ("payments_ops", 12),
       "account_access": ("identity", 12),
       "bug": ("engineering_support", 24),
       "billing": ("billing_support", 24),
       "delivery": ("logistics_support", 48),
       "general": ("support_general", 48),
   }
   q, sla = queue_map.get(category, ("support_general", 48))
   if priority == "p0":
       sla = min(sla, 2)
   elif priority == "p1":
       sla = min(sla, 8)
   return {"queue": q, "sla_hours": sla}


@tool(_description="Generate a playbook response based on category + priority.")
def draft_response(category: str, priority: str, ticket_text: str) -> Dict(str, Any):
   templates = {
       "fraud": "We’ve temporarily secured your account. Please confirm last 3 transactions and reset credentials.",
       "cancellation": "We can help cancel your subscription. Please confirm your plan and the effective date you want.",
       "refunds": "We’re checking the refund status. Please share the order/payment reference and date.",
       "account_access": "Let’s get you back in. Please use the password reset link; if blocked, we’ll verify identity.",
       "bug": "Thanks for reporting. Please share device/browser + a screenshot; we’ll attempt reproduction.",
       "billing": "We can help with billing. Please confirm the last 4 digits and the invoice period you need.",
       "delivery": "We’re checking shipment status. Please share your tracking ID and delivery address PIN/ZIP.",
       "general": "Thanks for reaching out."
   }
   base = templates.get(category, templates("general"))
   tone = "urgent" if priority == "p0" else ("fast" if priority == "p1" else "standard")
   return {
       "tone": tone,
       "message": f"{base}nnContext we received: '{ticket_text}'",
       "next_steps": ("request_missing_info", "log_case", "route_to_queue")
   }


registry = get_tool_registry()
tools_list = registry.list_tools() if hasattr(registry, "list_tools") else ()
rprint(Panel.fit(f"Registered tools: {tools_list}", title="Tool Registry"))

We register deterministic business tools for ticket classification, routing, and response formatting using Graphbit’s tool interface. We encode the domain logic directly into these tools so that they can be executed without any LLM dependencies. This establishes a reliable, testable foundation for subsequent agent orchestration. check it out full code here,

tool_exec_cfg = ExecutorConfig(
   max_execution_time_ms=10_000,
   max_tool_calls=50,
   continue_on_error=False,
   store_results=True,
   enable_logging=False
)
tool_executor = ToolExecutor(config=tool_exec_cfg) if "config" in ToolExecutor.__init__.__code__.co_varnames else ToolExecutor()


def offline_triage(ticket: Ticket) -> Dict(str, Any):
   c = classify_ticket(ticket.text)
   rt = route_ticket(c("category"), c("priority"))
   dr = draft_response(c("category"), c("priority"), ticket.text)
   return {
       "ticket_id": ticket.ticket_id,
       "user_id": ticket.user_id,
       "category": c("category"),
       "priority": c("priority"),
       "queue": rt("queue"),
       "sla_hours": rt("sla_hours"),
       "draft": dr("message"),
       "tone": dr("tone"),
       "steps": (
           ("classify_ticket", c),
           ("route_ticket", rt),
           ("draft_response", dr),
       )
   }


offline_results = (offline_triage


res_table = Table(title="Offline Pipeline Results")
res_table.add_column("Ticket", style="bold")
res_table.add_column("Category")
res_table.add_column("Priority")
res_table.add_column("Queue")
res_table.add_column("SLA (h)")
for r in offline_results:
   res_table.add_row(r("ticket_id"), r("category"), r("priority"), r("queue"), str(r("sla_hours")))
rprint(res_table)


prio_counts: Dict(str, int) = {}
sla_vals: List(int) = ()
for r in offline_results:
   prio_counts(r("priority")) = prio_counts.get(r("priority"), 0) + 1
   sla_vals.append(int(r("sla_hours")))


metrics = {
   "offline_mode": True,
   "tickets": len(offline_results),
   "priority_distribution": prio_counts,
   "sla_mean": float(np.mean(sla_vals)) if sla_vals else None,
   "sla_p95": float(np.percentile(sla_vals, 95)) if sla_vals else None,
}


rprint(Panel.fit(json.dumps(metrics, indent=2), title="Offline Metrics"))

We compile the registered tool into an offline execution pipeline and apply it to all tickets to generate structured triage results. We aggregate the output into tables and calculate priority and SLA metrics to evaluate system behavior. It demonstrates how GraphBit-based reasoning can be deterministically validated before it is introduced to agents. check it out full code here,

SYSTEM_POLICY = "You are a reliable support ops agent. Return STRICT JSON only."


workflow = Workflow("Ticket Triage Workflow (GraphBit)")


summarizer = Node.agent(
   name="Summarizer",
   agent_id="summarizer",
   system_prompt=SYSTEM_POLICY,
   prompt="Summarize this ticket in 1-2 lines. Return JSON: {"summary":"..."}nTicket: {input}",
   temperature=0.2,
   max_tokens=200
)


router_agent = Node.agent(
   name="RouterAgent",
   agent_id="router",
   system_prompt=SYSTEM_POLICY,
   prompt=(
       "You MUST use tools.n"
       "Call classify_ticket(text), route_ticket(category, priority), draft_response(category, priority, ticket_text).n"
       "Return JSON with fields: category, priority, queue, sla_hours, message.n"
       "Ticket: {input}"
   ),
   tools=(classify_ticket, route_ticket, draft_response),
   temperature=0.1,
   max_tokens=700
)


formatter = Node.agent(
   name="FinalFormatter",
   agent_id="final_formatter",
   system_prompt=SYSTEM_POLICY,
   prompt=(
       "Validate the JSON and output STRICT JSON only:n"
       "{"ticket_id":"...","category":"...","priority":"...","queue":"...","sla_hours":0,"customer_message":"..."}n"
       "Input: {input}"
   ),
   temperature=0.0,
   max_tokens=500
)


sid = workflow.add_node(summarizer)
rid = workflow.add_node(router_agent)
fid = workflow.add_node(formatter)


workflow.connect(sid, rid)
workflow.connect(rid, fid)
workflow.validate()


rprint(Panel.fit("Workflow validated: Summarizer -> RouterAgent -> FinalFormatter", title="Workflow Graph"))

We create a directed GraphBit workflow composed of multiple agent nodes with clearly defined responsibilities and strict JSON contracts. We connect these nodes into a valid execution graph that reflects the previously offline reasoning at the agent level. check it out full code here,

def pick_llm_config() -> Optional(Any):
   if os.getenv("OPENAI_API_KEY"):
       return LlmConfig.openai(os.getenv("OPENAI_API_KEY"), "gpt-4o-mini")
   if os.getenv("ANTHROPIC_API_KEY"):
       return LlmConfig.anthropic(os.getenv("ANTHROPIC_API_KEY"), "claude-sonnet-4-20250514")
   if os.getenv("DEEPSEEK_API_KEY"):
       return LlmConfig.deepseek(os.getenv("DEEPSEEK_API_KEY"), "deepseek-chat")
   if os.getenv("MISTRALAI_API_KEY"):
       return LlmConfig.mistralai(os.getenv("MISTRALAI_API_KEY"), "mistral-large-latest")
   return None


def run_agent_flow_once(ticket_text: str) -> Dict(str, Any):
   llm_cfg = pick_llm_config()
   if llm_cfg is None:
       return {
           "mode": "offline",
           "note": "Set OPENAI_API_KEY / ANTHROPIC_API_KEY / DEEPSEEK_API_KEY / MISTRALAI_API_KEY to enable execution.",
           "input": ticket_text
       }
   executor = Executor(llm_cfg, lightweight_mode=True, timeout_seconds=90, debug=False) if "lightweight_mode" in Executor.__init__.__code__.co_varnames else Executor(llm_cfg)
   if hasattr(executor, "configure"):
       executor.configure(timeout_seconds=90, max_retries=2, enable_metrics=True, debug=False)
   wf = Workflow("Single Ticket Run")
   s = Node.agent(
       name="Summarizer",
       agent_id="summarizer",
       system_prompt=SYSTEM_POLICY,
       prompt=f"Summarize this ticket in 1-2 lines. Return JSON: {{"summary":"..."}}nTicket: {ticket_text}",
       temperature=0.2,
       max_tokens=200
   )
   r = Node.agent(
       name="RouterAgent",
       agent_id="router",
       system_prompt=SYSTEM_POLICY,
       prompt=(
           "You MUST use tools.n"
           "Call classify_ticket(text), route_ticket(category, priority), draft_response(category, priority, ticket_text).n"
           "Return JSON with fields: category, priority, queue, sla_hours, message.n"
           f"Ticket: {ticket_text}"
       ),
       tools=(classify_ticket, route_ticket, draft_response),
       temperature=0.1,
       max_tokens=700
   )
   f = Node.agent(
       name="FinalFormatter",
       agent_id="final_formatter",
       system_prompt=SYSTEM_POLICY,
       prompt=(
           "Validate the JSON and output STRICT JSON only:n"
           "{"ticket_id":"...","category":"...","priority":"...","queue":"...","sla_hours":0,"customer_message":"..."}n"
           "Input: {input}"
       ),
       temperature=0.0,
       max_tokens=500
   )
   sid = wf.add_node(s)
   rid = wf.add_node(r)
   fid = wf.add_node(f)
   wf.connect(sid, rid)
   wf.connect(rid, fid)
   wf.validate()
   t0 = time.time()
   result = executor.execute(wf)
   dt_ms = int((time.time() - t0) * 1000)
   out = {"mode": "online", "execution_time_ms": dt_ms, "success": bool(result.is_success()) if hasattr(result, "is_success") else None}
   if hasattr(result, "get_all_variables"):
       out("variables") = result.get_all_variables()
   else:
       out("raw") = str(result)(:3000)
   return out


sample = tickets(0)
agent_run = run_agent_flow_once(sample.text)
rprint(Panel.fit(json.dumps(agent_run, indent=2)(:3000), title="Agent Workflow Run"))


rprint(Panel.fit("Done", title="Complete"))

We add optional LLM configuration and execution logic that enables the same workflow to run autonomously when the provider key is available. We execute the workflow on a single ticket and capture the execution status and output. This final step demonstrates how the system seamlessly transitions from offline determinism to fully agentic execution.

Finally, we implemented an entire GraphBit workflow spanning runtime configuration, tool registration, offline deterministic execution, metric aggregation, and optional agent-based orchestration with external LLM providers. We demonstrated how the same business logic can be executed both manually through the tool and automatically through connected agent nodes in a validated graph, highlighting the strengths of GraphBit as an execution substrate rather than just an LLM wrapper. We demonstrated that complex agentic systems can be designed to fail gracefully, run without external dependencies, and scale to fully autonomous workflows even when LLM is enabled.


check it out full code hereAlso, feel free to follow us Twitter And don’t forget to join us 100k+ ml subreddit and subscribe our newsletterwait! Are you on Telegram? Now you can also connect with us on Telegram.


Asif Razzaq Marktechpost Media Inc. Is the CEO of. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of Artificial Intelligence for social good. Their most recent endeavor is the launch of MarketTechPost, an Artificial Intelligence media platform, known for its in-depth coverage of Machine Learning and Deep Learning news that is technically robust and easily understood by a wide audience. The platform boasts of over 2 million monthly views, which shows its popularity among the audience.

Related Articles

Leave a Comment