TransferQueue 数据系统

最后更新:11/17/2025.

本文档介绍 TransferQueue,这是一个用于高效后训练的异步流式数据管理系统。

概述

TransferQueue 是一个高性能的数据存储和传输模块,具备全景数据可见性和流式调度能力,专门优化后训练工作流中的高效数据流。

TransferQueue 提供细粒度、样本级别的数据管理和负载均衡(即将到来)能力,作为数据网关,解耦计算任务之间的显式数据依赖关系。这种设计支持分治策略,大大简化了算法控制器的设计。

更新

  • 2025 年 11 月 10 日:我们从 TransferQueueController 中分离出数据检索逻辑 PR#101。现在您可以实现自己的 Sampler 来控制如何消费数据。

  • 2025 年 11 月 5 日:我们提供了一个 KVStorageManager,简化了与基于 KV 的存储后端的集成 PR#96。第一个可用的基于 KV 的后端是 Yuanrong

  • 2025 年 11 月 4 日:数据分区能力现已在 PR#98 中可用。现在您可以定义逻辑数据分区来管理训练/验证/测试数据集。

  • 2025 年 10 月 25 日:我们通过 PR#66 使存储后端可插拔。现在您可以尝试将自己的存储后端与 TransferQueue 集成!

  • 2025 年 10 月 21 日:TransferQueue 已正式集成到 verl 中 verl/pulls/3649。后续 PR 将通过完全解耦数据流和控制流来优化单控制器架构。

  • 2025 年 7 月 22 日:我们在 Zhihu 1, 2 上发布了一系列中文博客。

  • 2025 年 7 月 21 日:我们在 verl 社区启动了一个 RFC verl/RFC#2662

  • 2025 年 7 月 2 日:我们发布了论文 AsyncFlow

组件

控制平面:全景数据管理

在控制平面中,TransferQueueController 以元数据的形式跟踪每个训练样本的生产状态消费状态。当所有所需的数据字段都准备就绪(即写入到 TransferQueueStorageManager)时,我们知道此数据样本可以被下游任务消费。

对于消费状态,我们为每个计算任务(例如 generate_sequencescompute_log_prob 等)记录消费记录。因此,即使不同计算任务需要相同的数据字段,它们也可以独立消费数据,而不会相互干扰。

为了使数据检索过程更加可定制,我们提供了一个 Sampler 类,允许用户定义自己的数据检索和消费逻辑。请参考 Customize 部分了解详情。

在未来,我们计划在控制平面中支持负载均衡动态批处理能力。此外,我们将支持去中心化框架的数据管理,在这种框架中,每个 rank 自行管理数据检索,而不是由单个控制器协调。

数据平面:分布式数据存储

在数据平面中,我们提供了一种可插拔设计,使 TransferQueue 能够根据用户需求集成不同的存储后端。

具体而言,我们提供了一个 TransferQueueStorageManager 抽象类,定义了以下核心 API:

  • async def put_data(self, data: TensorDict, metadata: BatchMeta) -> None

  • async def get_data(self, metadata: BatchMeta) -> TensorDict

  • async def clear_data(self, metadata: BatchMeta) -> None

此类封装了 TransferQueue 系统内的核心交互逻辑。您只需要编写一个简单的子类,即可集成自己的存储后端。请参考 Customize 部分了解详情。

目前,我们支持以下存储后端:

  • SimpleStorageUnit:一个基本的 CPU 内存存储,对数据格式约束 миним,使用简单。

  • Yuanrong:Ascend 原生数据系统,提供包括 HBM/DRAM/SSD 在内的分层存储接口。

  • MoonCakeStore(进行中):一种高性能、基于 KV 的分层存储,支持 GPU 和 DRAM 之间的 RDMA 传输。

  • Ray Direct Transport进行中):Ray 的新功能,允许 Ray 在 Ray actor 之间直接存储和传递对象。

其中,SimpleStorageUnit 作为我们的默认存储后端,由 AsyncSimpleStorageManager 类协调。每个存储单元可以部署在单独的节点上,实现分布式数据管理。

SimpleStorageUnit 采用以下 2D 数据结构:

  • 每一行对应一个训练样本,在相应全局批处理中分配唯一的索引。

  • 每一列表示计算任务的输入/输出数据字段。

此数据结构设计受到后训练过程的计算特性启发,其中每个训练样本通过任务管道以中继方式生成。它提供精确的寻址能力,允许以流式方式进行细粒度、并发的数据读写操作。

用户界面:异步与同步客户端

TransferQueue 系统的交互工作流程如下:

  1. 一个进程向 TransferQueueController 发送读取请求。

  2. TransferQueueController 扫描每个样本(行)的生产和消费元数据,并根据负载均衡策略动态组装微批元数据。该机制启用样本级别的数据调度。

  3. 该进程使用控制器提供的元数据从分布式存储单元检索实际数据。

为了简化 TransferQueue 的使用,我们将此过程封装到 AsyncTransferQueueClientTransferQueueClient 中。这些客户端提供了异步和同步接口用于数据传输,允许用户轻松将 TransferQueue 集成到他们的框架中。

在未来,我们将为去中心化框架提供一个 StreamingDataLoader 接口,如 issue#85verl/RFC#2662 中讨论的那样。利用此抽象,每个 rank 可以像 PyTorch 中的 DataLoader 一样自动获取自己的数据。TransferQueue 系统将处理由不同并行策略引起的基础数据调度和传输逻辑,大大简化去中心化框架的设计。

🔥 案例展示

通用用法

主要交互点是 AsyncTransferQueueClientTransferQueueClient,作为与 TransferQueue 系统通信的接口。

核心接口:

  • (async_)get_meta(data_fields: list[str], batch_size:int, global_step:int, get_n_samples:bool, task_name:str) -> BatchMeta

  • (async_)get_data(metadata:BatchMeta) -> TensorDict

  • (async_)put(data:TensorDict, metadata:BatchMeta, global_step)

  • (async_)clear(global_step: int)

我们即将发布详细的教程和 API 文档。

verl 示例

现在将 TransferQueue 集成到 verl 的主要动机是缓解单控制器 RayPPOTrainer 的数据传输瓶颈。目前,所有 DataProto 对象必须通过 RayPPOTrainer 路由,导致整个后训练系统存在单点瓶颈。

verl_dataflow_DataProto

通过利用 TransferQueue,我们通过以下方式分离经验数据传输和元数据调度:

  • DataProto 替换为 BatchMeta(元数据)和 TensorDict(实际数据)结构

  • 通过 BatchMeta 保留 verl 的原始调度/收集逻辑(保持单控制器调试性)

  • 通过 TransferQueue 的分布式存储单元加速数据传输

verl_dataflow_TransferQueue

您可以参考 recipe,其中我们在异步和同步场景中模拟了 verl 的用法。TransferQueue 已正式集成到 verl 中,现可在 verl/pulls/3649 获取(后续 PR 将进一步优化集成)。

使用 Python 包

pip install TransferQueue==0.1.1.dev2

从源代码构建 wheel 包

按照以下步骤构建并安装:

  1. 从 GitHub 仓库克隆源代码

    git clone https://github.com/TransferQueue/TransferQueue/
    cd TransferQueue
    
  2. 安装依赖项

    pip install -r requirements.txt
    
  3. 构建并安装

    python -m build --wheel
    pip install dist/*.whl
    

📊 性能

注意:上述 TransferQueue 基准测试基于我们简单的 SimpleStorageUnit 后端。通过引入高性能存储后端并优化序列化/反序列化,我们预计能实现更好的性能。热烈欢迎社区贡献!

有关详细的性能基准测试,请参考 此博客

🛠️ 自定义 TransferQueue

定义自己的数据检索逻辑

我们提供了一个 BaseSampler 抽象类,定义了以下接口:

@abstractmethod
def sample(
    self,
    ready_indexes: list[int],
    batch_size: int,
    *args: Any,
    **kwargs: Any,
) -> tuple[list[int], list[int]]:
    """Sample a batch of indices from the ready indices.

    Args:
        ready_indexes: List of global indices for which all required fields of the
        corresponding samples have been produced, and the samples are not labeled as
        consumed in the corresponding task.
        batch_size: Number of samples to select
        *args: Additional positional arguments for specific sampler implementations
        **kwargs: Additional keyword arguments for specific sampler implementations

    Returns:
        List of sampled global indices of length batch_size
        List of global indices of length batch_size that should be labeled as consumed
        (will never be retrieved in the future)

    Raises:
        ValueError: If batch_size is invalid or ready_indexes is insufficient
    """
    raise NotImplementedError("Subclasses must implement sample")

在此设计中,我们通过两个返回值分离数据检索和数据消费,这使我们能够轻松控制样本替换。我们实现了两个参考设计:SequentialSamplerGRPOGroupNSampler

Sampler 类或实例应在初始化时传递到 TransferQueueController。在每次 get_meta 调用中,您可以向 Sampler 提供动态采样参数。

from transfer_queue import TransferQueueController, TransferQueueClient, GRPOGroupNSampler, process_zmq_server_info

# 选项 1:将采样器类传递给 TransferQueueController
controller = TransferQueueController.remote(GRPOGroupNSampler)

# 选项 2:将采样器实例传递给 TransferQueueController(如果您需要自定义配置)
your_own_sampler = YourOwnSampler(config)
controller = TransferQueueController.remote(your_own_sampler)

# 使用采样器
batch_meta = client.get_meta(
    data_fields=["input_ids", "attention_mask"],
    batch_size=8,
    partition_id="train_0",
    task_name="generate_sequences",
    sampling_config={"n_samples_per_prompt": 4}  # 在此处放置所需的采样参数
)

如何集成新的存储后端

数据平面组织如下:

  transfer_queue/
  ├── storage/
  │   ├── __init__.py
  │   │── simple_backend.py             # SimpleStorageUnit、StorageUnitData、StorageMetaGroup
  │   ├── managers/                     # Managers are upper level interfaces that encapsulate the interaction logic with TQ system.
  │   │   ├── __init__.py
  │   │   ├──base.py                    # TransferQueueStorageManager, KVStorageManager
  │   │   ├──simple_backend_manager.py  # AsyncSimpleStorageManager
  │   │   ├──yuanrong_manager.py        # YuanrongStorageManager
  │   │   ├──mooncake_manager.py        # MooncakeStorageManager
  │   │   └──factory.py                 # TransferQueueStorageManagerFactory
  │   └── clients/                      # Clients are lower level interfaces that directly manipulate the target storage backend.
  │   │   ├── __init__.py
  │   │   ├── base.py                   # TransferQueueStorageKVClient
  │   │   ├── yuanrong_client.py         # YRStorageClient
  │   │   ├── mooncake_client.py         # MooncakeStoreClient
  │   │   └── factory.py                # TransferQueueStorageClientFactory

要将自定义存储后端与 TransferQueue 集成,首先实现一个继承自 TransferQueueStorageManager 的子类。此子类充当 TransferQueue 系统与目标存储后端之间的适配器。对于基于 KV 的存储后端,您可以简单地继承 KVStorageManager,它可以作为所有基于 KV 的后端的通用管理器。

分布式存储后端通常带有自己的原生客户端,作为存储系统的接口。在此类情况下,按照 storage/clients 目录中的示例编写一个低级别适配器。

StorageManagerStorageClient 提供了工厂类,以方便集成。在工厂类中添加必要参数的描述有助于改善整体用户体验。

引用

如果您发现此仓库有用,请善意引用我们的论文:
@article{han2025asyncflow,
  title={AsyncFlow: An Asynchronous Streaming RL Framework for Efficient LLM Post-Training},
  author={Han, Zhenyu and You, Ansheng and Wang, Haibo and Luo, Kui and Yang, Guang and Shi, Wenqi and Chen, Menglong and Zhang, Sicheng and Lan, Zeshun and Deng, Chunshi and others},
  journal={arXiv preprint arXiv:2507.01663},
  year={2025}
}