LLM Inference: Data Parallelism, Model Parallelism, and Pipeline Parallelism

by
0 comments
LLM Inference: Data Parallelism, Model Parallelism, and Pipeline Parallelism

Author(s): Tushar Vatsa

Originally published on Towards AI.

Credit : www.veracity.com

In previous postWe explored how KV cache optimization impacts inference performance. Using the Phi-2 model as an example, we observed that increasing sequence length led to an approximately linear decline in token-per-second throughput.

In this post, we’ll explore how data parallelism, model parallelism, and pipeline parallelism work within an inference engine, examining their effects on memory usage, throughput, and overall performance trade-offs.

data parallel

In data parallelism, we distribute the dataset across multiple devices while keeping a complete copy of the model on each device. This approach works well when the model fits comfortably in the memory of a single device. By processing different data batches in parallel, we can ideally increase the speed of inference depending on the number of devices available. However, this strategy becomes impractical for very large models that cannot fit entirely on a single GPU.

Figure 1: Data parallel diagram (Credit: eraser.io)

When working with multiple devices, the dataset needs to be divided equally among them. For example, if we have 100 datapoints and 2 GPUs, an ideal strategy is to split the data in a 50/50 ratio. To ensure randomness and avoid bias, we usually shuffle the indices first, and then divide them evenly across the GPU. Let’s see how to do this.

#Dataset file : dataset.py
import torch
from random import Random
import torch.distributed as dist
from torch.utils.data import DataLoader

class Partition(): #Standard pytorch dataset class(containing len and getitem methods)
def __init__(self, data, index):
self.data = data #Entire Dataset(list)
self.index = index #Indices(list) : Different for each device

def __len__(self):
return len(self.index) #Partition represents the chunk of data(not entire data)

def __getitem__(self, index):
data_idx = self.index(index)
return self.data(data_idx)

class DataPartitioner():
def __init__(self, data, sizes=(0.5, 0.5), seed=1234):
self.data = data
self.partitions = partitions
rng = Random()
rng.seed(seed)
#Get the length of the entire dataset
data_len = len(data)
indices = list(range(data_len))
#Shuffle the indices
rng.shuffle(indices)
#Partition the indices for the devices
start_idx = 0
for size in sizes:
part_len = int(size * data_len)
self.partitions.append(indices(start_idx:start_idx + part_len))
start_idx += part_len

def use(self, partition):
return Partition(self.data, self.partitions(partition)) #Dataset, List of Indices

def partition_dataset(rank, world_size, dataset, batch_size=128, collate_fn=None):
partitioned_batch_size = batch_size // world_size
sizes = (1/ world_size for _ in range(world_size)) #world_size : The number of devices
partitioner = DataPartitioner(dataset, sizes=sizes)
partition = partitioner.use(rank)
#Wrap this in a dataloader
dataloader = DataLoader(
partition,
batch_size=partitioned_batch_size,
collate_fn=collate_fn
)
return dataloader

Pretty intuitive, right?

If you are wondering world_size And rankHere is a simple explanation:

  • world_size Refers to the total number of devices (for example, GPUs) participating in the training process.
  • rank Identifies the specific device among them.

For example, if you have 4 GPUs, world_size is set to 4, and rank Can take values ​​from 0 to 3, representing each GPU.

Training a model at scale requires two essential components: multiple concurrent processes (torch.multiprocessing) and efficient communication between devices(torch.distribute,

Question: Why do you think we need communication? Think about it.

Now, the distributed data loader is ready. Let’s write the code for the training process.

#imports
import tqdm
import torch
import dataset #The dataloader that we wrote
import numpy as np
import torch.nn as nn
from functools import partial
import torch.distributed as dist
from torch.utils.data import DataLoader
from torch.multiprocessing import Process
from transformers import AutoConfig, GPT2LMHeadModel
from utils import get_tokenizer, collate_batch
#You can write your own tokenizer, train and generate functions

def average_gradients(model):
world_size = dist.get_world_size()
for param in model.parameters():
if param.grad is not None:
dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM) #Communication overhead across gpus
param.grad.data /= world_size

def train(model, optimizer, examples, batch_size, collate_fn, desc, rank=0, average_gradients_fn=None):
model.train()
tokens_per_sec = ()
tokens_num = ()

for i, batch in enumerate(prog_bar := tqdm.tqdm(examples, desc=f'Training ({desc})')):
t0 = time.time()
optimizer.zero_grad()
logits = model(input_ids=batch('input_ids')).logits
loss = torch.nn.functional.cross_entropy(
input=logits.reshape((-1, logits.shape(-1))),
target=batch('labels').reshape(-1),
reduction='none')
loss = (torch.sum(loss * batch('label_token_weights').reshape(-1)) /
torch.sum(batch('label_token_weights')))

loss.backward() #Calculates the gradients
if average_gradients_fn is not None:
average_gradients_fn(model)

optimizer.step() #Updates the weights
batch_time = time.time() - t0
tokens = np.prod(batch('input_ids').shape)
tokens_per_sec.append(tokens / batch_time)
tokens_num.append(tokens)
prog_bar.set_postfix(
tokens_per_sec=tokens / batch_time,
loss=loss.item())
return np.mean(tokens_per_sec), tokens_num

def setup(rank, world_size, backend): #Sets up the communication between multiple devices(GPUs)
os.environ('MASTER_ADDRESS') = 'localhost'
os.environ('MASTER_PORT') = '33333'

dist.init_process_group(backend=backend, rank=rank, world_size=world_size)

#This function will be run by each process concurrently, the id for the process is 'rank'
def run_dp(rank, world_size, backend, dataset_name, model_max_length, n_epochs, batch_size, learning_rate):
setup(rank, world_size, backend)
config = AutoConfig.from_pretrained('gpt2')
model = GPT2LMHeadModel(config=config).to(rank) #This is great! We are loading it on each device, that's why rank!!!
optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate) #Surprisingly, AdamW works great for LLMs
#We will use german(deutsch) to english translation dataset
dataset = {
split: datasets.load_dataset(dataset_name, split=split)('translation')
for split in ('train', 'validation', 'test')
}
src_key, tgt_key = 'de', 'en'
dataset('train') = dataset('train')(:5000)
dataset('validation') = dataset('validation')(:1000)
dataset('test') = dataset('test')(:100)

#tokenization
tokenizer = get_tokenizer(examples=dataset('train'), vocab_size=config.vocab_size, src_key=src_key, tgt_key=tgt_key)

#collate function : partial pre-fills some of the arguments
collate_fn = partial(collate_batch, src_key=src_key, tgt_key=tgt_key, tokenizer=tokenizer, model_max_length=model_max_length, device=rank)

train_loader = partition_dataset(rank, world_size, dataset('train'), batch_size=batch_size, collate_fn=collate_fn)
val_loader = DataLoader(dataset("validation"), batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(dataset("test"), batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

total_time = ()
total_tokens_per_sec = ()

for epoch_idx in range(n_epochs):
start = time.time()
avg_tokens_per_sec, _ = train(
model=model,
optimizer=optimizer,
examples=train_loader,
batch_size=batch_size,
collate_fn=collate_fn,
desc=desc,
rank=rank,
average_gradients_fn=average_gradients)

end = time.time()

if __name__ == '__main__':
import torch.multiprocessing as mp
mp.set_start_method('spawn', force=True)

parser = argparse.ArgumentParser()
parser.add_argument('--pytest', type=bool, default=False)
parser.add_argument('--dataset', type=str)
parser.add_argument('--model_max_length', type=int, default=128)
parser.add_argument('--n_epochs', type=int, default=10)
parser.add_argument('--batch_size', type=int, default=128)
parser.add_argument('--learning_rate', type=float, default=1e-4)
parser.add_argument('--world_size', type=int, default=2)
args = parser.parse_args()

backend = 'nccl' #for cpu choose 'gloo'
for rank in range(world_size):
p = Process(
target=run_dp,
args=(rank, world_size, backend, args.dataset, args.model_max_length,
args.n_epochs, args.batch_size, args.learning_rate)
)
p.start()
processes.append(p)

# Wait for all processes to finish
for p in processes:
p.join()

Before updating the model weights, the gradients of all instruments are averaged to ensure consistency across replicates. This means that at each stage of training, each GPU has an identical copy of the model.

Figure 2: All-reduce method for averaging gradients

Let’s run a quick experiment. I’ve got access to 2 H100 (80GB) GPUs which is definitely overkill for a smaller model like the GPT-2. But hey, it’s what I have. As the saying goes, you can’t kill a mosquito with a bazooka, but sometimes, it’s the only weapon in your arsenal. So let’s see what kind of training throughput and training time we can get from this setup.

Figure 3: Throughput comparison during training for single and 2 GPUs

Voila! We’re seeing about 2× throughput (number of tokens processed during training), but that speedup comes at the expense of doubling the hardware.

Figure 4: Comparison of training time for single and 2 GPUs

While we expected the average training time per epoch to be approximately halved compared to single-GPU training, repeated runs revealed a different story. Most epochs show better speed, but one or two consistently display unusually high variation. This spike can be attributed to several overheads such as data loader latency when fetching the next batch, Python’s garbage collection initiation, or synchronization delay from NCCL during inter-device communication.

model parallel

Model parallelism involves partitioning the model across multiple devices. For example, if we have a model with 12 layers and 2 GPUs, we can place the first 6 layers on GPU 0 and the remaining 6 layers on GPU 1. Each device handles a portion of the front and rear pass, allowing us to train models that otherwise wouldn’t fit on a single GPU.

Figure 5: Model parallel flow diagram

In the traditional model parallelism, execution is sequential meaning at any given time, only one GPU is actively processing. So, even if you have 4 GPUs and the model is divided among them, only one GPU handles the computation at a time, while the others remain idle waiting for their turn. This leads to reduced utilization and diminishing returns as the number of devices increases.

import math
def get_device_map(n_layers, devices):
"""Returns a dictionary of layers distributed evenly across all devices."""
layers = list(range(n_layers))
n_blocks = int(math.ceil(n_layers / len(devices)))
layers_list = (layers(i : i + n_blocks) for i in range(0, n_layers, n_blocks))

return dict(zip(devices, layers_list))

def parallelize(self, device_map=None):
'''
Distribute the model layers across the devices based on the device_map.
'''

self.device_map = (
get_device_map(len(self.h), range(torch.cuda.device_count())) if device_map is None else device_map
)
self.model_parallel = True
self.first_device = "cpu" if "cpu" in self.device_map.keys() else "cuda:" + str(min(self.device_map.keys()))
self.last_device = "cuda:" + str(max(self.device_map.keys()))
self.wte = self.wte.to(self.first_device)
self.wpe = self.wpe.to(self.first_device)
# Load onto devices
for k, v in self.device_map.items():
for block in v:
cuda_device = "cuda:" + str(k)
self.h(block) = self.h(block).to(cuda_device)
# ln_f to last
self.ln_f = self.ln_f.to(self.last_device)

For simplicity, let’s consider the case of GPT-2:

There are 12 layers, and we distribute it across 2 GPUs.

Table 1: Representation of the different layers spanning a GPU

wte, wpe → “cuda:0”

Iteration 1: k=0, v=(0,1,2,3,4,5)
└── self.h(0).to(“cuda:0”)
└── self.h(1).to(“cuda:0”)
└── self.h(2).to(“cuda:0”)
└── self.h(3).to(“cuda:0”)
└── self.h(4).to(“cuda:0”)
└── self.h(5).to(“cuda:0”)

Iteration 2: k=1, v=(6,7,8,9,10,11)
└── self.h(6).to(“cuda:1”)
└── self.h(7).to(“cuda:1”)
└── self.h(8).to(“cuda:1”)
└── self.h(9).to(“cuda:1”)
└── self.h(10).to(“cuda:1”)
└── self.h(11).to(“cuda:1”)

ln_f, lm_head → “cuda:1”

To conduct a more challenging experiment beyond GPT-2, we tested LLaMA-2–7B: a 7 billion parameter model using model parallelism. We ran it on both 2 and 4 V100 (16GB) GPUs under the same configuration to evaluate performance and scalability.

Table 2: LAMA-2-7B Model Similarity Parameters and Results

Only one GPU is active at a time! Data flows sequentially through each GPU, like water through a pipe. Adding more GPUs does not make the water flow faster, it simply divides the pipe into more sections. But this enables much larger training models for a single GPU.

pipeline parallel

It is a technique used to distribute a deep learning model across multiple GPUs by dividing the model into sequential steps and processing different parts of the input batch in parallel. Instead of executing the entire model on a single GPU at a time, as is the case with traditional model parallelism, pipelined parallelism allows each GPU to handle a specific section of the model and operate concurrently on different GPUs. micro batch of input.

Here’s how it works in practice: Let’s say you have a model with 32 layers and 4 GPUs available. You can split the model so that each GPU can handle 8 layers. When a batch of input data comes in, it is divided into smaller pieces called micro-batches. The first micro-batch starts processing on GPU 0 (which handles layers 0-7). As soon as it completes that step, it sends its intermediate activations to GPU 1 (handling layers 8-15), while GPU 0 immediately starts processing the second micro-batch. This pattern continues: GPU 2 starts working on the first micro-batch as soon as it receives it from GPU 1, and GPU 3 picks up the output from GPU 2, creating a continuous flow of data through the pipeline.

The result is that, after a brief “warm-up” period called pipeline bubbleAll GPUs are actively engaged, each working on a different micro-batch at any given time. This overlapping of execution results in far better GPU utilization than model parallelism, where only one GPU is typically active at a time. This is similar to how an assembly line works: each stage (or GPU) focuses on a specific task, and the system becomes efficient when the line is completed.

Figure 6: Model parallelism vs pipeline parallelism

There is a scheduler that figures out which GPU processes which micro-batch at each time step.

def _clock_cycles(num_batches: int, num_partitions: int) -> Iterable(List(Tuple(int, int))):
"""
Key insight: batch i is at partition j when: clock = i + j
"""

total_clocks = num_batches + num_partitions - 1 # Fill + Run + Drain

for clock in range(total_clocks):
schedule = ()
for i in range(num_batches):
j = clock - i # Which partition is batch i at?
if 0 <= j < num_partitions: # Is it a valid partition?
schedule.append((i, j)) # (batch_idx, partition_idx)
yield schedule

For example: (3 batches, 3 divisions)

Clock 0: ((0,0)) → GPU 0 processes batch 0
Clock 1: ((1,0), (0,1)) → GPU 0 processes batch 1, GPU 1 processes batch 0
Clock 2: ((2,0), (1,1), (0,2)) → all 3 GPUs busy!
Clock 3: ((2,1), (1,2)) → GPU 1 processes batch 2, GPU 2 processes batch 1
Clock 4: ((2,2)) → GPU 2 processes batch 2

def forward(self, x):
# 1. SPLIT: Divide batch into micro-batches
batches = list(x.split(self.split_size, dim=0))

# 2. SCHEDULE: Generate the clock cycle schedule
schedules = _clock_cycles(num_batches, num_partitions)

# 3. EXECUTE: Process each clock cycle
for schedule in schedules:
self.compute(batches, schedule) # ← This runs GPUs in parallel!

# 4. COMBINE: Concatenate results
output = torch.cat(batches, dim=0)
return output.to(last_device)

def compute(self, batches, schedule):
# PHASE 1: Submit ALL tasks in parallel (non-blocking)
for batch_idx, partition_idx in schedule:
batch = batches(batch_idx).to(devices(partition_idx))

def compute_fn():
return partition(batch) # Run layers on this GPU

task = Task(compute_fn)
self.in_queues(partition_idx).put(task) # Send to worker thread

# PHASE 2: Collect ALL results
for batch_idx, partition_idx in schedule:
success, result = self.out_queues(partition_idx).get() # Wait for result
batches(batch_idx) = output # Store result for next stage

If you are wondering why we can’t use processes like data parallelism?

Pipeline parallelism requires tight, consistent coordination between GPUs at each clock cycle. Threads with shared memory queues provide this with minimal overhead, while separate processes will add a lot of communication latency.

Data parallelism, model parallelism, and pipeline parallelism each provide different ways to scale deep learning workloads across multiple GPUs, but their trade-offs make them suitable for different scenarios.

Table 3: Comparison Summary of Data vs Model vs Pipeline Similarity

Ultimately, there is no one-size-fits-all strategy. If your model fits in memory, data parallelism is often simplest. If you’re working with larger models, model or pipeline parallelism becomes necessary and a combination of strategies (e.g. pipeline + data parallelism) can unlock even greater scalability. The key is to understand the size of your model, the constraints of your hardware, and the trade-offs offered by each approach in terms of memory, communication, and performance.

Published via Towards AI

Related Articles

Leave a Comment