In this tutorial, we explore Online Process Reward Learning (OPRL) and demonstrate how we can learn dense, step-level reward signals from trajectory preferences to solve sparse-reward reinforcement learning tasks. We walk through each component, from the maze environment and reward-model network to preference generation, training loops, and evaluation, seeing how the agent gradually improves its behavior through online preference-driven shaping. By running this end-to-end implementation, we gain a practical understanding of how OPRL enables better credit assignment, faster learning, and more stable policy adaptation in challenging environments where agents would otherwise struggle to find meaningful rewards. check it out full code notebook,
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam
import matplotlib.pyplot as plt
from collections import deque
import random
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)
class MazeEnv:
def __init__(self, size=8):
self.size = size
self.start = (0, 0)
self.goal = (size-1, size-1)
self.obstacles = set(((i, size//2) for i in range(1, size-2)))
self.reset()
def reset(self):
self.pos = self.start
self.steps = 0
return self._get_state()
def _get_state(self):
state = np.zeros(self.size * self.size)
state(self.pos(0) * self.size + self.pos(1)) = 1
return state
def step(self, action):
moves = ((-1,0), (0,1), (1,0), (0,-1))
new_pos = (self.pos(0) + moves(action)(0),
self.pos(1) + moves(action)(1))
if (0 <= new_pos(0) < self.size and
0 <= new_pos(1) < self.size and
new_pos not in self.obstacles):
self.pos = new_pos
self.steps += 1
done = self.pos == self.goal or self.steps >= 60
reward = 10.0 if self.pos == self.goal else 0.0
return self._get_state(), reward, done
def render(self):
grid = (('.' for _ in range(self.size)) for _ in range(self.size))
for obs in self.obstacles:
grid(obs(0))(obs(1)) = 'â–ˆ'
grid(self.goal(0))(self.goal(1)) = 'G'
grid(self.pos(0))(self.pos(1)) = 'A'
return 'n'.join((''.join(row) for row in grid))
class ProcessRewardModel(nn.Module):
def __init__(self, state_dim, hidden=128):
super().__init__()
self.net = nn.Sequential(
nn.Linear(state_dim, hidden),
nn.LayerNorm(hidden),
nn.ReLU(),
nn.Linear(hidden, hidden),
nn.LayerNorm(hidden),
nn.ReLU(),
nn.Linear(hidden, 1),
nn.Tanh()
)
def forward(self, states):
return self.net(states)
def trajectory_reward(self, states):
return self.forward(states).sum()
class PolicyNetwork(nn.Module):
def __init__(self, state_dim, action_dim, hidden=128):
super().__init__()
self.backbone = nn.Sequential(
nn.Linear(state_dim, hidden),
nn.ReLU(),
nn.Linear(hidden, hidden),
nn.ReLU()
)
self.actor = nn.Linear(hidden, action_dim)
self.critic = nn.Linear(hidden, 1)
def forward(self, state):
features = self.backbone(state)
return self.actor(features), self.critic(features)
We’ve set up the entire foundation of our OPRL system by importing libraries, defining the maze environment, and building the reward and policy network. We establish how states are represented, how obstacles prevent movement, and how the sparse reward structure works. We also design core neural models that will later learn to process rewards and drive policy decisions. check it out full code notebook,
class OPRLAgent:
def __init__(self, state_dim, action_dim, lr=3e-4):
self.policy = PolicyNetwork(state_dim, action_dim)
self.reward_model = ProcessRewardModel(state_dim)
self.policy_opt = Adam(self.policy.parameters(), lr=lr)
self.reward_opt = Adam(self.reward_model.parameters(), lr=lr)
self.trajectories = deque(maxlen=200)
self.preferences = deque(maxlen=500)
self.action_dim = action_dim
def select_action(self, state, epsilon=0.1):
if random.random() < epsilon:
return random.randint(0, self.action_dim - 1)
state_t = torch.FloatTensor(state).unsqueeze(0)
with torch.no_grad():
logits, _ = self.policy(state_t)
probs = F.softmax(logits, dim=-1)
return torch.multinomial(probs, 1).item()
def collect_trajectory(self, env, epsilon=0.1):
states, actions, rewards = (), (), ()
state = env.reset()
done = False
while not done:
action = self.select_action(state, epsilon)
next_state, reward, done = env.step(action)
states.append(state)
actions.append(action)
rewards.append(reward)
state = next_state
traj = {
'states': torch.FloatTensor(np.array(states)),
'actions': torch.LongTensor(actions),
'rewards': torch.FloatTensor(rewards),
'return': float(sum(rewards))
}
self.trajectories.append(traj)
return traj
We start building the OPRL agent by implementing action selection and trajectory collection. We use an ε-greedy strategy to ensure exploration and collect sequences of states, actions, and returns. As we drive the agent through the maze, we store the entire trajectory which will later serve as preference data to shape the reward model. check it out full code notebook,
def generate_preference(self):
if len(self.trajectories) < 2:
return
t1, t2 = random.sample(list(self.trajectories), 2)
label = 1.0 if t1('return') > t2('return') else 0.0
self.preferences.append({'t1': t1, 't2': t2, 'label': label})
def train_reward_model(self, n_updates=5):
if len(self.preferences) < 32:
return 0.0
total_loss = 0.0
for _ in range(n_updates):
batch = random.sample(list(self.preferences), 32)
loss = 0.0
for item in batch:
r1 = self.reward_model.trajectory_reward(item('t1')('states'))
r2 = self.reward_model.trajectory_reward(item('t2')('states'))
logit = r1 - r2
pred_prob = torch.sigmoid(logit)
label = item('label')
loss += -(label * torch.log(pred_prob + 1e-8) +
(1-label) * torch.log(1 - pred_prob + 1e-8))
loss = loss / len(batch)
self.reward_opt.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.reward_model.parameters(), 1.0)
self.reward_opt.step()
total_loss += loss.item()
return total_loss / n_updates
We generate preference pairs from the collected trajectories and train the process reward model using the Bradley-Terry formulation. We compare trajectory-level scores, calculate probabilities, and update the reward model to reflect which behavior appears better. This allows us to learn dense, discrete, step-level rewards that guide the agent even when the environment is sparse. check it out full code notebook,
def train_policy(self, n_updates=3, gamma=0.98):
if len(self.trajectories) < 5:
return 0.0
total_loss = 0.0
for _ in range(n_updates):
traj = random.choice(list(self.trajectories))
with torch.no_grad():
process_rewards = self.reward_model(traj('states')).squeeze()
shaped_rewards = traj('rewards') + 0.1 * process_rewards
returns = ()
G = 0
for r in reversed(shaped_rewards.tolist()):
G = r + gamma * G
returns.insert(0, G)
returns = torch.FloatTensor(returns)
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
logits, values = self.policy(traj('states'))
log_probs = F.log_softmax(logits, dim=-1)
action_log_probs = log_probs.gather(1, traj('actions').unsqueeze(1))
advantages = returns - values.squeeze().detach()
policy_loss = -(action_log_probs.squeeze() * advantages).mean()
value_loss = F.mse_loss(values.squeeze(), returns)
entropy = -(F.softmax(logits, dim=-1) * log_probs).sum(-1).mean()
loss = policy_loss + 0.5 * value_loss - 0.01 * entropy
self.policy_opt.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.policy.parameters(), 1.0)
self.policy_opt.step()
total_loss += loss.item()
return total_loss / n_updates
def train_oprl(episodes=500, render_interval=100):
env = MazeEnv(size=8)
agent = OPRLAgent(state_dim=64, action_dim=4, lr=3e-4)
returns, reward_losses, policy_losses = (), (), ()
success_rate = ()
for ep in range(episodes):
epsilon = max(0.05, 0.5 - ep / 1000)
traj = agent.collect_trajectory(env, epsilon)
returns.append(traj('return'))
if ep % 2 == 0 and ep > 10:
agent.generate_preference()
if ep > 20 and ep % 2 == 0:
rew_loss = agent.train_reward_model(n_updates=3)
reward_losses.append(rew_loss)
if ep > 10:
pol_loss = agent.train_policy(n_updates=2)
policy_losses.append(pol_loss)
success = 1 if traj('return') > 5 else 0
success_rate.append(success)
if ep % render_interval == 0 and ep > 0:
test_env = MazeEnv(size=8)
agent.collect_trajectory(test_env, epsilon=0)
print(test_env.render())
return returns, reward_losses, policy_losses, success_rate
We train the policy using rewards of the size produced by the learned process reward model. We calculate returns, profits, value estimates, and entropy bonuses, helping the agent improve its strategy over time. We then create a full training loop in which exploration is reduced, preferences are accumulated, and both the reward model and policy are continuously updated. check it out full code notebook,
print("Training OPRL Agent on Sparse Reward Maze...n")
returns, rew_losses, pol_losses, success = train_oprl(episodes=500, render_interval=250)
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
axes(0,0).plot(returns, alpha=0.3)
axes(0,0).plot(np.convolve(returns, np.ones(20)/20, mode="valid"), linewidth=2)
axes(0,0).set_xlabel('Episode')
axes(0,0).set_ylabel('Return')
axes(0,0).set_title('Agent Performance')
axes(0,0).grid(alpha=0.3)
success_smooth = np.convolve(success, np.ones(20)/20, mode="valid")
axes(0,1).plot(success_smooth, linewidth=2, color="green")
axes(0,1).set_xlabel('Episode')
axes(0,1).set_ylabel('Success Rate')
axes(0,1).set_title('Goal Success Rate')
axes(0,1).grid(alpha=0.3)
axes(1,0).plot(rew_losses, linewidth=2, color="orange")
axes(1,0).set_xlabel('Update Step')
axes(1,0).set_ylabel('Loss')
axes(1,0).set_title('Reward Model Loss')
axes(1,0).grid(alpha=0.3)
axes(1,1).plot(pol_losses, linewidth=2, color="red")
axes(1,1).set_xlabel('Update Step')
axes(1,1).set_ylabel('Loss')
axes(1,1).set_title('Policy Loss')
axes(1,1).grid(alpha=0.3)
plt.tight_layout()
plt.show()
print("OPRL Training Complete!")
print("Process rewards, preference learning, reward shaping, and online updates demonstrated.")
We visualize the learning dynamics by plotting returns, success rates, reward-model loss, and policy loss. We monitor how the agent’s performance evolves as OPRL shapes the reward landscape. By the end of the visualization, we clearly see the effect of processing rewards on solving a challenging, sparse-reward maze.
In conclusion, we see how OPRL transforms sparse terminal results into rich online feedback that continuously guides the agent’s behavior. We see the process reward model learning preferences, shaping return signals, and accelerating the ability of the policy to reach the goal. With larger mazes, different shaping strengths or even actual human preference feedback, we appreciate how OPRL provides a flexible and powerful framework for credit assignment in complex decision-making tasks. We end with a clear, practical understanding of how OPRL operates and how we can extend it to more advanced agentic RL settings.
check it out full code notebook And paperFeel free to check us out GitHub page for tutorials, code, and notebooksAlso, feel free to follow us Twitter And don’t forget to join us 100k+ ml subreddit and subscribe our newsletterwait! Are you on Telegram? Now you can also connect with us on Telegram.
Asif Razzaq Marktechpost Media Inc. Is the CEO of. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of Artificial Intelligence for social good. Their most recent endeavor is the launch of MarketTechPost, an Artificial Intelligence media platform, known for its in-depth coverage of Machine Learning and Deep Learning news that is technically robust and easily understood by a wide audience. The platform boasts of over 2 million monthly views, which shows its popularity among the audience.