In this tutorial, we explore how federated learning behaves when traditional centralized aggregation servers are removed and replaced with a completely decentralized, peer-to-peer gossip mechanism. We implement both centralized FedAvg and decentralized gossip federated learning from scratch and introduce client-side differential privacy by injecting calibrated noise into local model updates. By running controlled experiments on non-IID MNIST data, we investigate how the strength of secrecy, as measured by different epsilon values, directly affects convergence speed, stability, and final model accuracy. Furthermore, we study the practical trade-off between privacy guarantees and learning efficiency in real-world decentralized learning systems. check it out full code here.
import os, math, random, time
from dataclasses import dataclass
from typing import Dict, List, Tuple
import subprocess, sys
def pip_install(pkgs):
subprocess.check_call((sys.executable, "-m", "pip", "install", "-q") + pkgs)
pip_install(("torch", "torchvision", "numpy", "matplotlib", "networkx", "tqdm"))
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import networkx as nx
from tqdm import trange
SEED = 7
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = False
torch.backends.cudnn.benchmark = True
transform = transforms.Compose((transforms.ToTensor()))
train_ds = datasets.MNIST(root="/content/data", train=True, download=True, transform=transform)
test_ds = datasets.MNIST(root="/content/data", train=False, download=True, transform=transform)
We set up the execution environment and installed all the necessary dependencies. We initialize random seeds and device settings to maintain reproducibility across all experiments. We also load the MNIST dataset, which serves as a lightweight but effective benchmark for federated learning experiments. check it out full code here.
def make_noniid_clients(dataset, num_clients=20, shards_per_client=2, seed=SEED):
rng = np.random.default_rng(seed)
y = np.array((dataset(i)(1) for i in range(len(dataset))))
idx = np.arange(len(dataset))
idx_sorted = idx(np.argsort(y))
num_shards = num_clients * shards_per_client
shard_size = len(dataset) // num_shards
shards = (idx_sorted(i*shard_size:(i+1)*shard_size) for i in range(num_shards))
rng.shuffle(shards)
client_indices = ()
for c in range(num_clients):
take = shards(c*shards_per_client:(c+1)*shards_per_client)
client_indices.append(np.concatenate(take))
return client_indices
NUM_CLIENTS = 20
client_indices = make_noniid_clients(train_ds, num_clients=NUM_CLIENTS, shards_per_client=2)
test_loader = DataLoader(test_ds, batch_size=1024, shuffle=False, num_workers=2, pin_memory=True)
class MLP(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(28*28, 256)
self.fc2 = nn.Linear(256, 128)
self.fc3 = nn.Linear(128, 10)
def forward(self, x):
x = x.view(x.size(0), -1)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return self.fc3(x)
We construct a non-IID data distribution by dividing the training dataset into label-based shards among multiple clients. We define a compact neural network model that balances expressivity and computational efficiency. This enables us to realistically simulate data heterogeneity, which is a significant challenge in federated learning systems. check it out full code here.
def get_model_params(model):
return {k: v.detach().clone() for k, v in model.state_dict().items()}
def set_model_params(model, params):
model.load_state_dict(params, strict=True)
def add_params(a, b):
return {k: a(k) + b(k) for k in a.keys()}
def sub_params(a, b):
return {k: a(k) - b(k) for k in a.keys()}
def scale_params(a, s):
return {k: a(k) * s for k in a.keys()}
def mean_params(params_list):
out = {k: torch.zeros_like(params_list(0)(k)) for k in params_list(0).keys()}
for p in params_list:
for k in out.keys():
out(k) += p(k)
for k in out.keys():
out(k) /= len(params_list)
return out
def l2_norm_params(delta):
sq = 0.0
for v in delta.values():
sq += float(torch.sum(v.float() * v.float()).item())
return math.sqrt(sq)
def dp_sanitize_update(delta, clip_norm, epsilon, delta_dp, rng):
norm = l2_norm_params(delta)
scale = min(1.0, clip_norm / (norm + 1e-12))
clipped = scale_params(delta, scale)
if epsilon is None or math.isinf(epsilon) or epsilon <= 0:
return clipped
sigma = clip_norm * math.sqrt(2.0 * math.log(1.25 / delta_dp)) / epsilon
noised = {}
for k, v in clipped.items():
noise = torch.normal(mean=0.0, std=sigma, size=v.shape, generator=rng, device=v.device, dtype=v.dtype)
noised(k) = v + noise
return noised
We implement parameter manipulation utilities that enable addition, subtraction, scaling, and averaging of model weights across clients. We introduce differential privacy by clipping local updates and injecting Gaussian noise, both determined by the chosen privacy budget. It serves as the main privacy mechanism that enables us to study the privacy-utility trade-off in both centralized and decentralized settings. check it out full code here.
def local_train_one_client(base_params, client_id, epochs, lr, batch_size, weight_decay=0.0):
model = MLP().to(device)
set_model_params(model, base_params)
model.train()
loader = DataLoader(
Subset(train_ds, client_indices(client_id).tolist() if hasattr(client_indices(client_id), "tolist") else client_indices(client_id)),
batch_size=batch_size,
shuffle=True,
num_workers=2,
pin_memory=True
)
opt = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=weight_decay)
for _ in range(epochs):
for xb, yb in loader:
xb, yb = xb.to(device), yb.to(device)
opt.zero_grad(set_to_none=True)
logits = model(xb)
loss = F.cross_entropy(logits, yb)
loss.backward()
opt.step()
return get_model_params(model)
@torch.no_grad()
def evaluate(params):
model = MLP().to(device)
set_model_params(model, params)
model.eval()
total, correct = 0, 0
loss_sum = 0.0
for xb, yb in test_loader:
xb, yb = xb.to(device), yb.to(device)
logits = model(xb)
loss = F.cross_entropy(logits, yb, reduction="sum")
loss_sum += float(loss.item())
pred = torch.argmax(logits, dim=1)
correct += int((pred == yb).sum().item())
total += int(yb.numel())
return loss_sum / total, correct / total
We define a local training loop that each client executes independently on its own private data. We also implement an integrated evaluation routine to measure test loss and accuracy for any model condition. Together, these functions simulate realistic federated learning behavior where training and evaluation are completely separated from data ownership. check it out full code here.
@dataclass
class FedAvgConfig:
rounds: int = 25
clients_per_round: int = 10
local_epochs: int = 1
lr: float = 0.06
batch_size: int = 64
clip_norm: float = 2.0
epsilon: float = math.inf
delta_dp: float = 1e-5
def run_fedavg(cfg):
global_params = get_model_params(MLP().to(device))
history = {"test_loss": (), "test_acc": ()}
for r in trange(cfg.rounds):
chosen = random.sample(range(NUM_CLIENTS), k=cfg.clients_per_round)
start_params = global_params
updates = ()
for cid in chosen:
local_params = local_train_one_client(start_params, cid, cfg.local_epochs, cfg.lr, cfg.batch_size)
delta = sub_params(local_params, start_params)
rng = torch.Generator(device=device)
rng.manual_seed(SEED * 10000 + r * 100 + cid)
delta_dp = dp_sanitize_update(delta, cfg.clip_norm, cfg.epsilon, cfg.delta_dp, rng)
updates.append(delta_dp)
avg_update = mean_params(updates)
global_params = add_params(start_params, avg_update)
tl, ta = evaluate(global_params)
history("test_loss").append(tl)
history("test_acc").append(ta)
return history, global_params
We implement the centralized FedAvg algorithm, where a subset of clients is trained locally and sends individual private updates to a central aggregator. We track model performance across communication rounds to observe convergence behavior under different privacy budgets. This serves as a baseline against which decentralized gossip-based learning is compared. check it out full code here.
@dataclass
class GossipConfig:
rounds: int = 25
local_epochs: int = 1
lr: float = 0.06
batch_size: int = 64
clip_norm: float = 2.0
epsilon: float = math.inf
delta_dp: float = 1e-5
topology: str = "ring"
p: float = 0.2
gossip_pairs_per_round: int = 10
def build_topology(cfg):
if cfg.topology == "ring":
G = nx.cycle_graph(NUM_CLIENTS)
elif cfg.topology == "erdos_renyi":
G = nx.erdos_renyi_graph(NUM_CLIENTS, cfg.p, seed=SEED)
if not nx.is_connected(G):
comps = list(nx.connected_components(G))
for i in range(len(comps) - 1):
a = next(iter(comps(i)))
b = next(iter(comps(i+1)))
G.add_edge(a, b)
else:
raise ValueError
return G
def run_gossip(cfg):
node_params = (get_model_params(MLP().to(device)) for _ in range(NUM_CLIENTS))
G = build_topology(cfg)
history = {"avg_test_loss": (), "avg_test_acc": ()}
for r in trange(cfg.rounds):
new_params = ()
for cid in range(NUM_CLIENTS):
p0 = node_params(cid)
p_local = local_train_one_client(p0, cid, cfg.local_epochs, cfg.lr, cfg.batch_size)
delta = sub_params(p_local, p0)
rng = torch.Generator(device=device)
rng.manual_seed(SEED * 10000 + r * 100 + cid)
delta_dp = dp_sanitize_update(delta, cfg.clip_norm, cfg.epsilon, cfg.delta_dp, rng)
p_local_dp = add_params(p0, delta_dp)
new_params.append(p_local_dp)
node_params = new_params
edges = list(G.edges())
for _ in range(cfg.gossip_pairs_per_round):
i, j = random.choice(edges)
avg = mean_params((node_params(i), node_params(j)))
node_params(i) = avg
node_params(j) = avg
losses, accs = (), ()
for cid in range(NUM_CLIENTS):
tl, ta = evaluate(node_params(cid))
losses.append(tl)
accs.append(ta)
history("avg_test_loss").append(float(np.mean(losses)))
history("avg_test_acc").append(float(np.mean(accs)))
return history, node_params
We implement decentralized gossip federated learning using a peer-to-peer model that executes exchanges on a predefined network topology. We repeatedly simulate local training and pairwise parameter averaging without relying on a central server. This allows us to analyze how privacy noise propagates through decentralized communication patterns and affects convergence. check it out full code here.
eps_sweep = (math.inf, 8.0, 4.0, 2.0, 1.0)
ROUNDS = 20
fedavg_results = {}
gossip_results = {}
common_local_epochs = 1
common_lr = 0.06
common_bs = 64
common_clip = 2.0
common_delta = 1e-5
for eps in eps_sweep:
fcfg = FedAvgConfig(
rounds=ROUNDS,
clients_per_round=10,
local_epochs=common_local_epochs,
lr=common_lr,
batch_size=common_bs,
clip_norm=common_clip,
epsilon=eps,
delta_dp=common_delta
)
hist_f, _ = run_fedavg(fcfg)
fedavg_results(eps) = hist_f
gcfg = GossipConfig(
rounds=ROUNDS,
local_epochs=common_local_epochs,
lr=common_lr,
batch_size=common_bs,
clip_norm=common_clip,
epsilon=eps,
delta_dp=common_delta,
topology="ring",
gossip_pairs_per_round=10
)
hist_g, _ = run_gossip(gcfg)
gossip_results(eps) = hist_g
plt.figure(figsize=(10, 5))
for eps in eps_sweep:
plt.plot(fedavg_results(eps)("test_acc"), label=f"FedAvg eps={eps}")
plt.xlabel("Round")
plt.ylabel("Accuracy")
plt.legend()
plt.grid(True)
plt.show()
plt.figure(figsize=(10, 5))
for eps in eps_sweep:
plt.plot(gossip_results(eps)("avg_test_acc"), label=f"Gossip eps={eps}")
plt.xlabel("Round")
plt.ylabel("Avg Accuracy")
plt.legend()
plt.grid(True)
plt.show()
final_fed = (fedavg_results(eps)("test_acc")(-1) for eps in eps_sweep)
final_gos = (gossip_results(eps)("avg_test_acc")(-1) for eps in eps_sweep)
x = (100.0 if math.isinf(eps) else eps for eps in eps_sweep)
plt.figure(figsize=(8, 5))
plt.plot(x, final_fed, marker="o", label="FedAvg")
plt.plot(x, final_gos, marker="o", label="Gossip")
plt.xlabel("Epsilon")
plt.ylabel("Final Accuracy")
plt.legend()
plt.grid(True)
plt.show()
def rounds_to_threshold(acc_curve, threshold):
for i, a in enumerate(acc_curve):
if a >= threshold:
return i + 1
return None
best_f = fedavg_results(math.inf)("test_acc")(-1)
best_g = gossip_results(math.inf)("avg_test_acc")(-1)
th_f = 0.9 * best_f
th_g = 0.9 * best_g
for eps in eps_sweep:
rf = rounds_to_threshold(fedavg_results(eps)("test_acc"), th_f)
rg = rounds_to_threshold(gossip_results(eps)("avg_test_acc"), th_g)
print(eps, rf, rg)
We run controlled experiments at multiple privacy levels and collect results for both centralized and decentralized training strategies. We visualize convergence trends and ultimate accuracy to clearly highlight the privacy-utility trade-off. We also calculate convergence speed metrics to quantitatively compare how different aggregation schemes react to increasing privacy constraints.
In conclusion, we demonstrated that decentralization fundamentally changes how differential privacy noise propagates through a federated system. We observe that while centralized FedAvg generally converges faster under weak privacy constraints, gossip-based federated learning is more robust to noisy updates at the expense of slower convergence. Our experiments highlight that strong privacy guarantees significantly slower learning in both settings, but the effect increases in decentralized topologies due to delays in information mixing. Overall, we showed that designing privacy-preserving federated systems requires reasoning jointly rather than treating aggregation topology, communication patterns, and privacy budget as independent choices.
check it out full code here. Also, feel free to follow us Twitter And don’t forget to join us 100k+ ml subreddit and subscribe our newsletter. wait! Are you on Telegram? Now you can also connect with us on Telegram.
