PPO 示例架构
上次更新:02/17/2025。
让我们从 Proximal Policy Optimization 算法开始,这是 LLM(大语言模型)后训练中最广泛使用的算法之一。
PPO 算法示例的主要入口点为:main_ppo.py。在本教程中,我们将逐步介绍 main_ppo.py 中的代码架构。
定义数据
用户需要预处理数据集并将其存储为 parquet 文件(一种高效的列式存储格式)。我们实现了 RLHFDataset 类来加载并分词这些 parquet 文件。
对于 ``RLHFDataset``(默认配置),至少需要 1 个字段:
prompt:包含字符串提示词
我们已经在 data_preprocess 目录 中提供了一些处理数据集为 parquet 文件的示例。目前,我们支持 GSM8k、MATH、Hellasage 和 Full_hh_rlhf 数据集的预处理。更多信息请参见 Prepare Data for Post-Training。
定义不同数据集的奖励函数
在此主要入口点中,用户只需针对 PPO 训练中使用的数据集(或应用)定义自己的奖励函数即可。
例如,我们已经为 GSM8k
和 MATH
数据集提供了奖励函数,分别位于 _select_rm_score_fn 中。在 RewardManager 中,我们将根据数据来源(data_source)计算奖励分数,并选择相应的奖励函数。对于某些 RLHF(Reinforcement Learning from Human Feedback,人类反馈强化学习)数据集(如 full_hh_rlhf),会使用奖励模型直接评估响应,而无需奖励函数。在这种情况下,RewardManager 将直接返回奖励模型计算的 rm_score。
详细实现请参见 奖励函数。
定义工作器类
if config.actor_rollout_ref.actor.strategy in {"fsdp", "fsdp2"}: # for FSDP backend
assert config.critic.strategy in {"fsdp", "fsdp2"}
from verl.workers.fsdp_workers import ActorRolloutRefWorker, CriticWorker
from verl.single_controller.ray import RayWorkerGroup
ray_worker_group_cls = RayWorkerGroup
elif config.actor_rollout_ref.actor.strategy == 'megatron': # for Megatron backend
assert config.actor_rollout_ref.actor.strategy == config.critic.strategy
from verl.workers.megatron_workers import ActorRolloutRefWorker, CriticWorker
from verl.single_controller.ray.megatron import NVMegatronRayWorkerGroup
ray_worker_group_cls = NVMegatronRayWorkerGroup # Ray worker class for Megatron-LM
else:
raise NotImplementedError
from verl.trainer.ppo.ray_trainer import ResourcePoolManager, Role
role_worker_mapping = {
Role.ActorRollout: ActorRolloutRefWorker,
Role.Critic: CriticWorker,
Role.RefPolicy: ActorRolloutRefWorker
}
global_pool_id = 'global_pool'
resource_pool_spec = {
global_pool_id: [config.trainer.n_gpus_per_node] * config.trainer.nnodes,
}
mapping = {
Role.ActorRollout: global_pool_id,
Role.Critic: global_pool_id,
Role.RefPolicy: global_pool_id,
}
步骤 1:构建角色(Role)与工作器(Worker)的映射
角色表示在同一进程中的一组工作器。我们在 ray_trainer.py 中预定义了几个角色。
class Role(Enum):
"""
To create more roles dynamically, you can subclass Role and add new members
"""
Actor = 0 # This worker only has Actor
Rollout = 1 # This worker only has Rollout
ActorRollout = 2 # This worker has both actor and rollout, it's a HybridEngine
Critic = 3 # This worker only has critic
RefPolicy = 4 # This worker only has reference policy
RewardModel = 5 # This worker only has reward model
ActorRolloutRef = 6 # This worker contains actor, rollout and reference policy simultaneously
步骤 2:定义该角色对应的工作器类
我们已经预先实现了
ActorRolloutRefWorker。通过不同的配置,它可以是一个独立的 Actor、独立的 Rollout、ActorRollout 混合引擎或 ActorRolloutRef 混合引擎。我们还针对
Actor、Rollout、Critic、Reward Model和Reference model预先实现了两种不同后端的工作器:PyTorch FSDP 和 Megatron-LM。 更多信息请参见 FSDP 工作器 和 Megatron-LM 工作器。
步骤 3:定义资源池 ID 和资源池规格
资源池是对全局 GPU 资源的划分,
resource_pool_spec是一个字典,将 ID 映射到 GPU 数量。在上面的示例中,我们定义了一个全局资源池:global_pool_id,然后将所有角色放在这个资源池中,使用该后训练任务中的所有 GPU。这称为“共置”(co-locate)放置,即所有模型共享同一组 GPU。
有关资源池和放置的高级用法,请参见相关文档。
定义奖励模型/函数
# we should adopt a multi-source reward function here
# - for rule-based rm, we directly call a reward score
# - for model-based rm, we call a model
# - for code related prompt, we send to a sandbox if there are test cases
# - finally, we combine all the rewards together
# - The reward type depends on the tag of the data
if config.reward_model.enable:
from verl.workers.fsdp_workers import RewardModelWorker
role_worker_mapping[Role.RewardModel] = RewardModelWorker
mapping[Role.RewardModel] = global_pool_id
reward_fn = RewardManager(tokenizer=tokenizer, num_examine=0)
# Note that we always use function-based RM for validation
val_reward_fn = RewardManager(tokenizer=tokenizer, num_examine=1)
resource_pool_manager = ResourcePoolManager(resource_pool_spec=resource_pool_spec, mapping=mapping)
由于并非所有任务都使用基于模型的奖励模型(RM),因此用户需要在这里定义它是基于模型的 RM 还是基于函数的 RM。
如果是基于模型的 RM,直接在资源映射中添加
RewardModel角色,并将其添加到资源池映射中。请注意,预定义的
RewardModelWorker只支持具有 Hugging FaceAutoModelForSequenceClassification结构模型。如果不是此类型的模型,您需要在 FSDP 工作器 和 Megatron-LM 工作器 中自定义自己的 RewardModelWorker。
如果是基于函数的 RM,用户需要针对每个数据集分类奖励函数。
def _select_rm_score_fn(data_source):
if data_source == 'openai/gsm8k':
return gsm8k.compute_score
elif data_source == 'lighteval/MATH':
return math.compute_score
else:
raise NotImplementedError
更多信息请参见 目录 中实现的奖励函数。
定义、初始化并运行 PPO 训练器
trainer = RayPPOTrainer(config=config,
tokenizer=tokenizer,
role_worker_mapping=role_worker_mapping,
resource_pool_manager=resource_pool_manager,
ray_worker_group_cls=ray_worker_group_cls,
reward_fn=reward_fn,
val_reward_fn=val_reward_fn)
trainer.init_workers()
trainer.fit()
我们首先使用用户配置、分词器以及上述所有工作器映射、资源池、工作器组和奖励函数来初始化
RayPPOTrainer。接着,我们调用
trainer.init_workers()来在已分配的 GPU(位于资源池中)上初始化模型。实际的 PPO 训练将在
trainer.fit()中执行。
verl 可以通过重用 Ray 模型工作器、资源池和奖励函数轻松扩展到其他强化学习算法。更多信息请参见 扩展。
RayPPOTrainer 的详细信息将在 Ray 训练器 中讨论。