In this tutorial, we implement an end-to-end Practical Byzantine Fault Tolerance (PBFT) simulator using asyncio. We model a realistic distributed network with asynchronous message passing, configurable delays, and Byzantine nodes that intentionally deviate from the protocol. By explicitly implementing the pre-prep, prepare, and commit steps, we explore how PBFT achieves consensus under adversarial conditions while respecting the theoretical 3f+1 bound. We also use the system to measure consensus latency and success rate as the number of malicious nodes increases, allowing us to empirically observe the limits of Byzantine fault tolerance.
import asyncio
import random
import time
import hashlib
from dataclasses import dataclass, field
from typing import Dict, Set, Tuple, Optional, List
import matplotlib.pyplot as plt
PREPREPARE = "PREPREPARE"
PREPARE = "PREPARE"
COMMIT = "COMMIT"
@dataclass(frozen=True)
class Msg:
typ: str
view: int
seq: int
digest: str
sender: int
@dataclass
class NetConfig:
min_delay_ms: int = 5
max_delay_ms: int = 40
drop_prob: float = 0.0
reorder_prob: float = 0.0
We set up the foundation of the simulator by importing the necessary libraries and defining the main PBFT message types. We formalize network messages and parameters using dataclasses to ensure structured, consistent communication. We also define constants representing the three PBFT stages used throughout the system.
class Network:
def __init__(self, cfg: NetConfig):
self.cfg = cfg
self.nodes: Dict(int, "Node") = {}
def register(self, node: "Node"):
self.nodes(node.nid) = node
async def send(self, dst: int, msg: Msg):
if random.random() < self.cfg.drop_prob:
return
d = random.uniform(self.cfg.min_delay_ms, self.cfg.max_delay_ms) / 1000.0
await asyncio.sleep(d)
if random.random() < self.cfg.reorder_prob:
await asyncio.sleep(random.uniform(0.0, 0.02))
await self.nodes(dst).inbox.put(msg)
async def broadcast(self, src: int, msg: Msg):
tasks = ()
for nid in self.nodes.keys():
tasks.append(asyncio.create_task(self.send(nid, msg)))
await asyncio.gather(*tasks)
We implement an asynchronous network layer that simulates real-world message delivery with delays, reordering, and potential degradation. We dynamically register nodes and use asyncio tasks to broadcast messages over the simulated network. We model non-deterministic communication behavior that directly affects the latency and robustness of consensus.
@dataclass
class NodeConfig:
n: int
f: int
primary_id: int = 0
view: int = 0
timeout_s: float = 2.0
class Node:
def __init__(self, nid: int, net: Network, cfg: NodeConfig, byzantine: bool = False):
self.nid = nid
self.net = net
self.cfg = cfg
self.byzantine = byzantine
self.inbox: asyncio.Queue(Msg) = asyncio.Queue()
self.preprepare_seen: Dict(int, str) = {}
self.prepare_votes: Dict(Tuple(int, str), Set(int)) = {}
self.commit_votes: Dict(Tuple(int, str), Set(int)) = {}
self.committed: Dict(int, str) = {}
self.running = True
@property
def f(self) -> int:
return self.cfg.f
def _q_prepare(self) -> int:
return 2 * self.f + 1
def _q_commit(self) -> int:
return 2 * self.f + 1
@staticmethod
def digest_of(payload: str) -> str:
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
We define the configuration and internal state of each PBFT node participating in the protocol. We introduce data structures to track the pre-emption, preparation, and commitment of votes, supporting both honest and Byzantine behavior. We also implement quorum threshold logic and deterministic digest generation for request validation.
async def propose(self, payload: str, seq: int):
if self.nid != self.cfg.primary_id:
raise ValueError("Only the primary can propose in this simplified simulator.")
if not self.byzantine:
dig = self.digest_of(payload)
msg = Msg(PREPREPARE, self.cfg.view, seq, dig, self.nid)
await self.net.broadcast(self.nid, msg)
return
for dst in self.net.nodes.keys():
variant = f"{payload}::to={dst}::salt={random.randint(0,10**9)}"
dig = self.digest_of(variant)
msg = Msg(PREPREPARE, self.cfg.view, seq, dig, self.nid)
await self.net.send(dst, msg)
async def handle_preprepare(self, msg: Msg):
seq = msg.seq
dig = msg.digest
if self.byzantine:
if random.random() < 0.5:
return
fake_dig = dig if random.random() < 0.5 else self.digest_of(dig + "::fake")
out = Msg(PREPARE, msg.view, seq, fake_dig, self.nid)
await self.net.broadcast(self.nid, out)
return
if seq not in self.preprepare_seen:
self.preprepare_seen(seq) = dig
out = Msg(PREPARE, msg.view, seq, dig, self.nid)
await self.net.broadcast(self.nid, out)
async def handle_prepare(self, msg: Msg):
seq, dig = msg.seq, msg.digest
key = (seq, dig)
voters = self.prepare_votes.setdefault(key, set())
voters.add(msg.sender)
if self.byzantine:
return
if self.preprepare_seen.get(seq) != dig:
return
if len(voters) >= self._q_prepare():
out = Msg(COMMIT, msg.view, seq, dig, self.nid)
await self.net.broadcast(self.nid, out)
async def handle_commit(self, msg: Msg):
seq, dig = msg.seq, msg.digest
key = (seq, dig)
voters = self.commit_votes.setdefault(key, set())
voters.add(msg.sender)
if self.byzantine:
return
if self.preprepare_seen.get(seq) != dig:
return
if seq in self.committed:
return
if len(voters) >= self._q_commit():
self.committed(seq) = dig
We implement the core PBFT protocol logic, including proposal management and pre-preparation and formulation stages. We explicitly model the Byzantine equation by allowing malicious nodes to send conflicting digests to different peers. Once the required preparation quorum is met we advance the protocol to the commit phase.
async def run(self):
while self.running:
msg = await self.inbox.get()
if msg.typ == PREPREPARE:
await self.handle_preprepare(msg)
elif msg.typ == PREPARE:
await self.handle_prepare(msg)
elif msg.typ == COMMIT:
await self.handle_commit(msg)
def stop(self):
self.running = False
def pbft_params(n: int) -> int:
return (n - 1) // 3
async def run_single_consensus(
n: int,
malicious: int,
net_cfg: NetConfig,
payload: str = "tx: pay Alice->Bob 5",
seq: int = 1,
timeout_s: float = 2.0,
seed: Optional(int) = None
) -> Dict(str, object):
if seed is not None:
random.seed(seed)
f_max = pbft_params(n)
f = f_max
net = Network(net_cfg)
cfg = NodeConfig(n=n, f=f, primary_id=0, view=0, timeout_s=timeout_s)
mal_set = set(random.sample(range(n), k=min(malicious, n)))
nodes: List(Node) = ()
for i in range(n):
node = Node(i, net, cfg, byzantine=(i in mal_set))
net.register(node)
nodes.append(node)
tasks = (asyncio.create_task(node.run()) for node in nodes)
t0 = time.perf_counter()
await nodes(cfg.primary_id).propose(payload, seq)
honest = (node for node in nodes if not node.byzantine)
target = max(1, len(honest))
committed_honest = 0
latency = None
async def poll_commits():
nonlocal committed_honest, latency
while True:
committed_honest = sum(1 for node in honest if seq in node.committed)
if committed_honest >= target:
latency = time.perf_counter() - t0
return
await asyncio.sleep(0.005)
try:
await asyncio.wait_for(poll_commits(), timeout=timeout_s)
success = True
except asyncio.TimeoutError:
success = False
latency = None
for node in nodes:
node.stop()
for task in tasks:
task.cancel()
await asyncio.gather(*tasks, return_exceptions=True)
digest_set = set(node.committed.get(seq) for node in honest if seq in node.committed)
agreed = (len(digest_set) == 1) if success else False
return {
"n": n,
"f": f,
"malicious": malicious,
"mal_set": mal_set,
"success": success,
"latency_s": latency,
"honest_committed": committed_honest,
"honest_total": len(honest),
"agreed_digest": agreed,
}
We complete the PBFT state machine by processing commit messages and finalizing decisions after the commit quorum is satisfied. We run a node event loop to continuously process incoming messages asynchronously. We also include lifecycle controls to safely stop nodes after running each experiment.
async def latency_sweep(
n: int = 10,
max_malicious: Optional(int) = None,
trials_per_point: int = 5,
timeout_s: float = 2.0,
net_cfg: Optional(NetConfig) = None,
seed: int = 7
):
if net_cfg is None:
net_cfg = NetConfig(min_delay_ms=5, max_delay_ms=35, drop_prob=0.0, reorder_prob=0.05)
if max_malicious is None:
max_malicious = n
results = ()
random.seed(seed)
for m in range(0, max_malicious + 1):
latencies = ()
successes = 0
agreements = 0
for t in range(trials_per_point):
out = await run_single_consensus(
n=n,
malicious=m,
net_cfg=net_cfg,
timeout_s=timeout_s,
seed=seed + 1000*m + t
)
results.append(out)
if out("success"):
successes += 1
latencies.append(out("latency_s"))
if out("agreed_digest"):
agreements += 1
avg_lat = sum(latencies)/len(latencies) if latencies else None
print(
f"malicious={m:2d} | success={successes}/{trials_per_point} "
f"| avg_latency={avg_lat if avg_lat is not None else 'NA'} "
f"| digest_agreement={agreements}/{successes if successes else 1}"
)
return results
def plot_latency(results: List(Dict(str, object)), trials_per_point: int):
by_m = {}
for r in results:
m = r("malicious")
by_m.setdefault(m, ()).append(r)
xs, ys = (), ()
success_rate = ()
for m in sorted(by_m.keys()):
group = by_m(m)
lats = (g("latency_s") for g in group if g("latency_s") is not None)
succ = sum(1 for g in group if g("success"))
xs.append(m)
ys.append(sum(lats)/len(lats) if lats else float("nan"))
success_rate.append(succ / len(group))
plt.figure()
plt.plot(xs, ys, marker="o")
plt.xlabel("Number of malicious (Byzantine) nodes")
plt.ylabel("Consensus latency (seconds) — avg over successes")
plt.title("PBFT Simulator: Latency vs Malicious Nodes")
plt.grid(True)
plt.show()
plt.figure()
plt.plot(xs, success_rate, marker="o")
plt.xlabel("Number of malicious (Byzantine) nodes")
plt.ylabel("Success rate")
plt.title("PBFT Simulator: Success Rate vs Malicious Nodes")
plt.ylim(-0.05, 1.05)
plt.grid(True)
plt.show()
async def main():
n = 10
trials = 6
f = pbft_params(n)
print(f"n={n} => PBFT theoretical max f = floor((n-1)/3) = {f}")
print("Theory: safety/liveness typically assumed when malicious <= f and timing assumptions hold.n")
results = await latency_sweep(
n=n,
max_malicious=min(n, f + 6),
trials_per_point=trials,
timeout_s=2.0,
net_cfg=NetConfig(min_delay_ms=5, max_delay_ms=35, drop_prob=0.0, reorder_prob=0.05),
seed=11
)
plot_latency(results, trials)
await main()
We conduct large-scale experiments by traversing different numbers of malicious nodes and collecting latency statistics. We aggregate the results to analyze the consensus success rate and visualize the system behavior using plots. We run the entire experiment pipeline and see how the PBFT degrades when the number of Byzantine faults exceeds the theoretical limit.
In conclusion, we gained practical insight into how the PBFT behaves beyond textbook guarantees and how aversive pressure affects both latency and liveliness in behavior. We looked at how quorum limits enforce security, why consensus breaks when Byzantine nodes exceed tolerable limits, and how asynchronous networks amplify these effects. This implementation provides a practical foundation for experimenting with more advanced distributed-system concepts, such as scene change, leader rotation, or authenticated messaging. This helps us build intuition for the design trade-offs that underlie modern blockchains and distributed trust systems.
check it out full code here. Also, feel free to follow us Twitter And don’t forget to join us 120k+ ml subreddit and subscribe our newsletter. wait! Are you on Telegram? Now you can also connect with us on Telegram.