HybridFlow 编程指南

最后更新:2025/06/02。

作者:Chi Zhang

verl 是论文 HybridFlow [1] 的开源实现。在本节中,我们将介绍 HybridFlow 的基本概念、动机以及如何使用 verl API 进行编程。

动机与设计

我们使用数据流(dataflow)来表示强化学习(RL)系统。[4]

数据流

数据流是计算的抽象表示。神经网络训练是一种典型的数据流。它可以通过计算图(computational graph)来表示。

该数据流图来自 CS231n 2024 年第 4 讲

该图 [2] 表示了一个多项式函数后接 sigmoid 函数的计算图。在神经网络计算的数据流中,每个节点表示一个运算符,每个边表示前向/反向传播的方向。计算图决定了神经网络的架构。

RL 作为数据流问题

强化学习(RL)训练也可以表示为数据流。下面是表示 RLHF 中使用的 PPO 算法的数据流图 [3]

PPO 数据流图,由知乎用户低级炼丹师提供

然而,RL 的数据流与神经网络训练的数据流有着根本区别,如下所示:

工作负载

节点

神经网络训练

运算符 (+/-/matmul/softmax)

张量移动

强化学习

高层运算符 (rollout/模型前向)

数据移动

在表格强化学习的情况下,每个运算符都是简单的标量数学运算(例如 Bellman 更新)。在深度强化学习(DRL)中,每个运算符都是高层神经网络计算,例如模型推理/更新。这使得 RL 成为一个两级数据流问题:

  • 控制流:定义高层运算符的执行方式(例如,在 PPO 中,我们首先执行 rollout。然后执行优势计算。最后执行训练)。它表达了 RL 算法的核心逻辑

  • 计算流:定义 神经网络计算 的数据流(例如,模型前向/反向/优化器)。

设计选择

LLM 时代之前,DRL 中使用的模型大小通常较小。因此,高层神经网络计算可以在单个进程中完成。这使得可以将计算流嵌入到控制流中作为一个单进程。

然而,在 LLM 时代,计算流(例如,训练神经网络)变成了多进程程序。这自然导致了两种设计选择:

  1. 将控制流也转换为多进程程序。然后与计算流并置(统一的多控制器)。

  • 优点:

    • 在固定的计算流和控制流下实现 最佳性能,因为训练和数据传输中的通信开销最小化。

  • 缺点:

    • 从软件角度看,计算和/或控制流 难以复用,因为计算代码与特定控制器代码耦合。例如,PPO 的训练循环是通用的。假设我们有一个用特定计算流(如 FSDP)实现的 PPO 训练流,如果我们想要将计算流从 FSDP 切换到 Megatron,由于控制流和计算流的耦合,控制流和计算流都无法复用。

    • 在灵活和动态控制流下,用户需要付出更多努力,因为程序的多进程特性。

  1. 分离流程:控制流使用单进程,计算流使用多进程。

  • 优点:

    • 解耦后,其他地方定义的计算流可以 轻松复用

    • 控制器运行在单进程上。使用 不同控制流实现新的 RL 算法简单且容易

  • 缺点:

    • 每次控制器进程和计算进程交互时都会产生额外的 数据通信开销。数据必须来回发送。

在 verl 中,采用了后一种策略,将控制流和计算流分离。verl 被设计为解耦 RL 算法的控制流,以及计算引擎的实现。

整体执行图

下面是一个简化的图,描述了强化学习作业的执行。在该图中,控制器运行在单进程上,而生成器/演员工作器(generator/actor workers)、批评家工作器(critic workers)运行在多进程上,放置在特定的资源组中。对于 rollout,控制器将数据传递给生成器以执行采样生成。rollout 完成后,数据传递回控制器以进行算法的下一步。其他工作器执行类似的执行。通过混合控制器设计,数据流和计算被解耦,从而在计算效率和定义算法训练循环的灵活性之间取得平衡。

执行图

代码库导览 (PPO)

入口函数

代码:https://github.com/volcengine/verl/blob/main/verl/trainer/main_ppo.py

在这个文件中,我们定义了一个远程函数 main_task,它作为控制器(driver)进程,如上图所示。我们还定义了一个 RewardManager,用户可以基于数据集中的数据源自定义奖励函数。请注意,RewardManager 应返回最终由 RL 算法优化的 token 级奖励。请注意,用户可以将基于模型的奖励和基于规则的奖励结合起来。 main_task 构建了一个 RayPPOTrainer 实例并启动 fit。请注意,main_task 作为单进程运行

我们强烈推荐 不要在 Ray 集群头上调度 ``main_task``,因为 main_task 会消耗大量内存,而头节点通常资源很少。

Ray 训练器

代码:https://github.com/volcengine/verl/blob/main/verl/trainer/ppo/ray_trainer.py

RayPPOTrainer 管理:

  • 工作器和工作器组的构建

  • 运行 PPO 算法的主循环

请注意,RayPPOTrainer 的 fit 函数 作为单进程运行

工作器和工作器组构建

每个工作器组管理运行在远程的一组工作器。请注意,工作器组在其构造函数的进程中运行。 每个工作器组内的每个工作器在 GPU 上运行。工作器组作为控制器进程与一组工作器交互的代理,以执行某些计算。为了做到这一点,我们必须将工作器的方法绑定到工作器组的方法上,并定义数据分发和数据收集。这通过简单装饰来实现,将在工作器定义部分介绍。

例如,在 PPO 中,我们定义了 3 个工作器组:

  • ActorRolloutRef:管理演员、rollout 和参考策略。ActorRolloutRefWorker 可以实例化为单个演员、单个 rollout、单个参考策略、组合演员/rollout 或组合演员/rollout/ref。这种设计旨在最大化代码复用以适应各种场景。将演员和 rollout 并置的原因是为了使用 nccl 进行快速权重传输。将演员和参考并置的原因是为了实现高效的 LoRA PPO,因为参考策略在 LoRA 中只是 PPO 的基础模型。并置通过 verl.single_controller.ray.base.create_colocated_worker_cls 完成,它创建一个暴露所有这些角色类方法的单个 Ray 远程类。

  • Critic:管理批评家模型

  • Reward:管理奖励模型

工作器组将在其指定的资源池上构建。资源池是 Ray 集群中的一组 GPU。

工作器定义

我们以 ActorRolloutRefWorker 为例。 它应暴露给控制器进程的 API 包括:

  • init_model:构建底层模型

  • generate_sequences:给定提示,生成响应

  • compute_log_prob:使用演员计算生成序列的对数概率

  • compute_ref_log_prob:使用参考策略计算生成序列的对数概率

  • save_checkpoint:保存检查点

请注意,这些方法定义在工作器中,只能通过远程调用调用。例如,如果控制器进程想要初始化模型,它必须调用:

for worker in actor_rollout_ref_wg:
    worker.init_model.remote()

如果控制器进程想要生成序列,它必须调用:

data = xxx
# split the data into dp chunks
data_dp_lst = data.split(dp_size)
output_dp_lst = []
for i, worker in enumerate(actor_rollout_ref_wg):
    output_future = worker.generate_sequences.remote(data_dp_lst[i])
    output_dp_lst.append(output_future)
output = torch.cat(ray.get(output_dp_lst), dim=0)

我们观察到,控制器进程调用工作器组方法通常可以分为 3 个部分:

  • 将数据分割成数据并行大小

  • 将相应数据分发到每个工作器

  • 计算完成后收集并连接数据

在 verl 中,我们设计了一个语法糖,将 3 个过程封装成控制器进程的单个调用。

@register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO)
def generate_sequences(data):
    ...

# on the driver
output = actor_rollout_ref_wg.generate_sequences(data)

我们用 register 装饰工作器的方法,它明确定义了如何分割和分发输入数据到每个工作器,以及输出数据如何被控制器收集和连接。例如,Dispatch.DP_COMPUTE_PROTO 将输入数据分割成 dp 块,分发每个数据到每个工作器,收集输出并连接结果。请注意,此函数要求输入和输出是此处定义的 DataProto(https://github.com/volcengine/verl/blob/main/verl/protocol.py)。

PPO 主循环

使用上述 API,我们可以实现 PPO 的主循环,就像它是单进程程序一样:

for prompt in dataloader:
    output = actor_rollout_ref_wg.generate_sequences(prompt)
    old_log_prob = actor_rollout_ref_wg.compute_log_prob(output)
    ref_log_prob = actor_rollout_ref_wg.compute_ref_log_prob(output)
    values = critic_wg.compute_values(output)
    rewards = reward_wg.compute_scores(output)
    # compute_advantages is running directly on the control process
    advantages = compute_advantages(values, rewards)
    output = output.union(old_log_prob)
    output = output.union(ref_log_prob)
    output = output.union(values)
    output = output.union(rewards)
    output = output.union(advantages)
    # update actor
    actor_rollout_ref_wg.update_actor(output)
    critic.update_critic(output)

收获

  • 这种编程范式使用户能够使用不同的计算后端,而无需修改控制进程。

  • 这种编程范式通过更改工作器组和资源池的映射,实现灵活的位置放置,而无需修改控制进程。

仓库组织

仓库中重要的代码文件组织如下:

verl # verl 包
  trainer
    main_ppo.py  # RL 训练的入口点
    ppo
      ray_trainer.py  # RL 算法如 PPO 的训练循环
    fsdp_sft_trainer.py  # 带 FSDP 后端的 SFT 训练器
  config
    generation.yaml  # rollout 的配置模板
    ppo_trainer.yaml  # RL 训练器的配置模板
  workers
    protocol.py  # DataProto 的接口
    fsdp_workers.py   # FSDP 工作器接口:ActorRolloutRefWorker, CriticWorker, RewardModelWorker
    megatron_workers.py  # Megatron 工作器接口:ActorRolloutRefWorker, CriticWorker, RewardModelWorker
    actor
      dp_actor.py  # 带 FSDP 后端的数据并行演员
      megatron_actor.py  # 带 Megatron 后端的 nD 并行演员
    critic
      dp_critic.py  # 带 FSDP 后端的数据并行批评家
      megatron_critic.py  # 带 FSDP 后端的数据并行批评家(原文可能有误,应为 Megatron)
    reward_model
      megatron
        reward_model.py  # 带 Megatron 后端的奖励模型
    rollout
      vllm
        vllm_rollout.py  # 带 vllm 后端的 rollout
      hf_rollout.py  # 带 HuggingFace TGI 后端的 rollout
    sharding_manager
      fsdp_ulysses.py  # 使用 FSDP + ulysses 时的数据和模型重分片
      fsdp_vllm.py  # 使用 FSDP + ulysses + vllm 时的数据和模型重分片
      megatron_vllm.py  # 使用 Megatron + vllm 时的数据和模型重分片
  utils
    dataset  # SFT/RM/RL 的数据集
    reward_score  # 函数式奖励
      gsm8k.py  # gsm8k 数据集的奖励函数
      math.py  # math 数据集的奖励函数
    seqlen_balancing.py  # 序列平衡优化
  models
    llama  # llama、deepseek、mistral 等 Megatron 实现
    transformers  # ulysses 与 transformer 模型(如 llama、qwen 等)的集成
    weight_loader_registery.py  # 将 HF 检查点加载到 Megatron 的权重加载器注册表
  third_party
    vllm  # RL 中 vllm 使用适配器
      vllm_spmd  # vllm >= v0.7 适配器
examples  # 示例脚本
tests  # 集成和单元测试
.github  # 持续集成测试配置