In this tutorial, we implement an agentic chain-of-thought pruning framework that generates multiple logic paths in parallel and dynamically prunes them using consensus signals and early stopping. We focus on improving argument efficiency by reducing redundant token usage while preserving answer correctness, demonstrating that self-consistency and lightweight graph-based agreement can serve as effective proxies for argument quality. We design the entire pipeline using a compact instruction-tuned model and progressive sampling to simulate how an agent can make decisions when it has “enough” reasoning. check it out full code here.
!pip -q install -U transformers accelerate bitsandbytes networkx scikit-learn
import re, time, random, math
import numpy as np
import torch
import networkx as nx
from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
SEED = 7
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
MODEL_NAME = "Qwen/Qwen2.5-0.5B-Instruct"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)
model = AutoModelForCausalLM.from_pretrained(
MODEL_NAME,
device_map="auto",
torch_dtype=torch.float16,
load_in_4bit=True
)
model.eval()
SYSTEM = "You are a careful problem solver. Keep reasoning brief and output a final numeric answer."
FINAL_RE = re.compile(r"Final:s*((-d)+(?:.d+)?)")
We have set up the Colab environment and loaded all the necessary libraries for efficient agentic reasoning. We initialize a lightweight instruction-tuned language model with quantization to ensure stable performance on limited GPU resources. We also define global configuration, randomness controls, and the core prompting patterns used throughout the tutorial. check it out full code here.
def make_prompt(q):
return (
f"{SYSTEM}nn"
f"Problem: {q}n"
f"Reasoning: (brief)n"
f"Final: "
)
def parse_final_number(text):
m = FINAL_RE.search(text)
if m:
return m.group(1).strip()
nums = re.findall(r"(-)?d+(?:.d+)?", text)
return nums(-1) if nums else None
def is_correct(pred, gold):
if pred is None:
return 0
try:
return int(abs(float(pred) - float(gold)) < 1e-9)
except:
return int(str(pred).strip() == str(gold).strip())
def tok_len(text):
return len(tokenizer.encode(text))
We define helper functions that generate signals, extract the final numerical answer, and evaluate accuracy against the ground truth. We standardize how answers are parsed so that different logic paths can be compared consistently. We also introduce token-counting utilities that allow us to measure logic efficiency later. check it out full code here.
@torch.no_grad()
def generate_paths(question, n, max_new_tokens=64, temperature=0.7, top_p=0.9):
prompt = make_prompt(question)
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
gen_cfg = GenerationConfig(
do_sample=True,
temperature=temperature,
top_p=top_p,
max_new_tokens=max_new_tokens,
pad_token_id=tokenizer.eos_token_id,
eos_token_id=tokenizer.eos_token_id,
num_return_sequences=n
)
out = model.generate(**inputs, generation_config=gen_cfg)
prompt_tok = inputs("input_ids").shape(1)
paths = ()
for i in range(out.shape(0)):
seq = out(i)
gen_ids = seq(prompt_tok:)
completion = tokenizer.decode(gen_ids, skip_special_tokens=True)
paths.append({
"prompt_tokens": int(prompt_tok),
"gen_tokens": int(gen_ids.shape(0)),
"completion": completion
})
return paths
We implement fast multi-sample generation that generates multiple logic paths in a single model call. We simply extract the generated continuations to differentiate the logic output for each path. We store token usage and completeness in a structured format to support downstream pruning decisions. check it out full code here.
def consensus_strength(completions, sim_threshold=0.22):
if len(completions) <= 1:
return (0.0) * len(completions)
vec = TfidfVectorizer(ngram_range=(1,2), max_features=2500)
X = vec.fit_transform(completions)
S = cosine_similarity(X)
G = nx.Graph()
n = len(completions)
G.add_nodes_from(range(n))
for i in range(n):
for j in range(i+1, n):
w = float(S(i, j))
if w >= sim_threshold:
G.add_edge(i, j, weight=w)
strength = (0.0) * n
for u, v, d in G.edges(data=True):
w = float(d.get("weight", 0.0))
strength(u) += w
strength(v) += w
return strength
We build a lightweight consensus mechanism using similarity graphs on the generated logic paths. We calculate pairwise similarity scores and convert them into a graph-based strength signal for each path. This allows us to achieve approximate agreement between argument trajectories without expensive model calls. check it out full code here.
def pick_final_answer(paths):
answers = (parse_final_number(p("completion")) for p in paths)
strengths = consensus_strength((p("completion") for p in paths))
groups = {}
for i, a in enumerate(answers):
if a is None:
continue
groups.setdefault(a, {"idx": (), "strength": 0.0, "tokens": 0})
groups(a)("idx").append(i)
groups(a)("strength") += strengths(i)
groups(a)("tokens") += paths(i)("gen_tokens")
if not groups:
return None, {"answers": answers, "strengths": strengths}
ranked = sorted(
groups.items(),
key=lambda kv: (len(kv(1)("idx")), kv(1)("strength"), -kv(1)("tokens")),
reverse=True
)
best_answer = ranked(0)(0)
best_indices = ranked(0)(1)("idx")
best_i = sorted(best_indices, key=lambda i: (paths(i)("gen_tokens"), -strengths(i)))(0)
return best_answer, {"answers": answers, "strengths": strengths, "best_i": best_i}
def pruned_agent_answer(
question,
batch_size=2,
k_max=10,
max_new_tokens=64,
temperature=0.7,
top_p=0.9,
stop_min_samples=4,
stop_ratio=0.67,
stop_margin=2
):
paths = ()
prompt_tokens_once = tok_len(make_prompt(question))
total_gen_tokens = 0
while len(paths) < k_max:
n = min(batch_size, k_max - len(paths))
new_paths = generate_paths(
question,
n=n,
max_new_tokens=max_new_tokens,
temperature=temperature,
top_p=top_p
)
paths.extend(new_paths)
total_gen_tokens += sum(p("gen_tokens") for p in new_paths)
if len(paths) >= stop_min_samples:
answers = (parse_final_number(p("completion")) for p in paths)
counts = {}
for a in answers:
if a is None:
continue
counts(a) = counts.get(a, 0) + 1
if counts:
sorted_counts = sorted(counts.items(), key=lambda kv: kv(1), reverse=True)
top_a, top_c = sorted_counts(0)
second_c = sorted_counts(1)(1) if len(sorted_counts) > 1 else 0
if top_c >= math.ceil(stop_ratio * len(paths)) and (top_c - second_c) >= stop_margin:
final, dbg = pick_final_answer(paths)
return {
"final": final,
"paths": paths,
"early_stopped_at": len(paths),
"tokens_total": int(prompt_tokens_once * len(paths) + total_gen_tokens),
"debug": dbg
}
final, dbg = pick_final_answer(paths)
return {
"final": final,
"paths": paths,
"early_stopped_at": None,
"tokens_total": int(prompt_tokens_once * len(paths) + total_gen_tokens),
"debug": dbg
}
We implement core agentive pruning logic that groups logic paths based on final answers and ranks them using consensus and efficiency signals. We begin progressive sampling with a quick stop to terminate generation once sufficient confidence emerges. We then choose a final answer that balances agreement strength and minimal token usage. check it out full code here.
def baseline_answer(question, k=10, max_new_tokens=64):
paths = generate_paths(question, n=k, max_new_tokens=max_new_tokens)
prompt_tokens_once = tok_len(make_prompt(question))
total_gen_tokens = sum(p("gen_tokens") for p in paths)
answers = (parse_final_number(p("completion")) for p in paths)
counts = {}
for a in answers:
if a is None:
continue
counts(a) = counts.get(a, 0) + 1
final = max(counts.items(), key=lambda kv: kv(1))(0) if counts else None
return {
"final": final,
"paths": paths,
"tokens_total": int(prompt_tokens_once * k + total_gen_tokens)
}
DATA = (
{"q": "If a store sells 3 notebooks for $12, how much does 1 notebook cost?", "a": "4"},
{"q": "What is 17*6?", "a": "102"},
{"q": "A rectangle has length 9 and width 4. What is its area?", "a": "36"},
{"q": "If you buy 5 apples at $2 each, how much do you pay?", "a": "10"},
{"q": "What is 144 divided by 12?", "a": "12"},
{"q": "If x=8, what is 3x+5?", "a": "29"},
{"q": "A jar has 30 candies. You eat 7. How many remain?", "a": "23"},
{"q": "If a train travels 60 km in 1.5 hours, what is its average speed (km/h)?", "a": "40"},
{"q": "Compute: (25 - 9) * 3", "a": "48"},
{"q": "What is the next number in the pattern: 2, 4, 8, 16, ?", "a": "32"},
)
base_acc, base_tok = (), ()
prun_acc, prun_tok = (), ()
for item in DATA:
b = baseline_answer(item("q"), k=8, max_new_tokens=56)
base_acc.append(is_correct(b("final"), item("a")))
base_tok.append(b("tokens_total"))
p = pruned_agent_answer(item("q"), max_new_tokens=56)
prun_acc.append(is_correct(p("final"), item("a")))
prun_tok.append(p("tokens_total"))
print("Baseline accuracy:", float(np.mean(base_acc)))
print("Baseline avg tokens:", float(np.mean(base_tok)))
print("Pruned accuracy:", float(np.mean(prun_acc)))
print("Pruned avg tokens:", float(np.mean(prun_tok)))
We compare the truncated agentic approach to a fixed self-consistency baseline. We evaluate both methods on accuracy and token consumption to quantify the efficiency gains from pruning. We conclude by reporting overall metrics that demonstrate how dynamic pruning preserves correctness while reducing logic cost.
In conclusion, we demonstrated that agentic pruning can significantly reduce effective token consumption without sacrificing accuracy by stopping arguments after sufficient consensus has emerged. We showed that the combination of self-consistency, similarity-based consensus graphs, and early-stop heuristics provides a practical and scalable approach to reasoning efficiency in agentic systems. This framework serves as a foundation for more advanced agentic behaviors, such as mid-generation pruning, budget-aware reasoning, and adaptive control over reasoning depth in real-world AI agents.
check it out full code here. Also, feel free to follow us Twitter And don’t forget to join us 100k+ ml subreddit and subscribe our newsletter. wait! Are you on Telegram? Now you can also connect with us on Telegram.
