TransferQueue 数据系统
最后更新:11/17/2025.
本文档介绍 TransferQueue,这是一个用于高效后训练的异步流式数据管理系统。
概述
TransferQueue 是一个高性能的数据存储和传输模块,具备全景数据可见性和流式调度能力,专门优化后训练工作流中的高效数据流。
TransferQueue 提供细粒度、样本级别的数据管理和负载均衡(即将到来)能力,作为数据网关,解耦计算任务之间的显式数据依赖关系。这种设计支持分治策略,大大简化了算法控制器的设计。
更新
2025 年 11 月 10 日:我们从 TransferQueueController 中分离出数据检索逻辑 PR#101。现在您可以实现自己的
Sampler来控制如何消费数据。2025 年 11 月 5 日:我们提供了一个
KVStorageManager,简化了与基于 KV 的存储后端的集成 PR#96。第一个可用的基于 KV 的后端是 Yuanrong。2025 年 11 月 4 日:数据分区能力现已在 PR#98 中可用。现在您可以定义逻辑数据分区来管理训练/验证/测试数据集。
2025 年 10 月 25 日:我们通过 PR#66 使存储后端可插拔。现在您可以尝试将自己的存储后端与 TransferQueue 集成!
2025 年 10 月 21 日:TransferQueue 已正式集成到 verl 中 verl/pulls/3649。后续 PR 将通过完全解耦数据流和控制流来优化单控制器架构。
2025 年 7 月 21 日:我们在 verl 社区启动了一个 RFC verl/RFC#2662。
2025 年 7 月 2 日:我们发布了论文 AsyncFlow。
组件
控制平面:全景数据管理
在控制平面中,TransferQueueController 以元数据的形式跟踪每个训练样本的生产状态和消费状态。当所有所需的数据字段都准备就绪(即写入到 TransferQueueStorageManager)时,我们知道此数据样本可以被下游任务消费。
对于消费状态,我们为每个计算任务(例如 generate_sequences、compute_log_prob 等)记录消费记录。因此,即使不同计算任务需要相同的数据字段,它们也可以独立消费数据,而不会相互干扰。
为了使数据检索过程更加可定制,我们提供了一个 Sampler 类,允许用户定义自己的数据检索和消费逻辑。请参考 Customize 部分了解详情。
在未来,我们计划在控制平面中支持负载均衡和动态批处理能力。此外,我们将支持去中心化框架的数据管理,在这种框架中,每个 rank 自行管理数据检索,而不是由单个控制器协调。
数据平面:分布式数据存储
在数据平面中,我们提供了一种可插拔设计,使 TransferQueue 能够根据用户需求集成不同的存储后端。
具体而言,我们提供了一个 TransferQueueStorageManager 抽象类,定义了以下核心 API:
async def put_data(self, data: TensorDict, metadata: BatchMeta) -> Noneasync def get_data(self, metadata: BatchMeta) -> TensorDictasync def clear_data(self, metadata: BatchMeta) -> None
此类封装了 TransferQueue 系统内的核心交互逻辑。您只需要编写一个简单的子类,即可集成自己的存储后端。请参考 Customize 部分了解详情。
目前,我们支持以下存储后端:
SimpleStorageUnit:一个基本的 CPU 内存存储,对数据格式约束 миним,使用简单。
Yuanrong:Ascend 原生数据系统,提供包括 HBM/DRAM/SSD 在内的分层存储接口。
MoonCakeStore(进行中):一种高性能、基于 KV 的分层存储,支持 GPU 和 DRAM 之间的 RDMA 传输。
Ray Direct Transport(进行中):Ray 的新功能,允许 Ray 在 Ray actor 之间直接存储和传递对象。
其中,SimpleStorageUnit 作为我们的默认存储后端,由 AsyncSimpleStorageManager 类协调。每个存储单元可以部署在单独的节点上,实现分布式数据管理。
SimpleStorageUnit 采用以下 2D 数据结构:
每一行对应一个训练样本,在相应全局批处理中分配唯一的索引。
每一列表示计算任务的输入/输出数据字段。
此数据结构设计受到后训练过程的计算特性启发,其中每个训练样本通过任务管道以中继方式生成。它提供精确的寻址能力,允许以流式方式进行细粒度、并发的数据读写操作。
用户界面:异步与同步客户端
TransferQueue 系统的交互工作流程如下:
一个进程向
TransferQueueController发送读取请求。TransferQueueController扫描每个样本(行)的生产和消费元数据,并根据负载均衡策略动态组装微批元数据。该机制启用样本级别的数据调度。该进程使用控制器提供的元数据从分布式存储单元检索实际数据。
为了简化 TransferQueue 的使用,我们将此过程封装到 AsyncTransferQueueClient 和 TransferQueueClient 中。这些客户端提供了异步和同步接口用于数据传输,允许用户轻松将 TransferQueue 集成到他们的框架中。
在未来,我们将为去中心化框架提供一个
StreamingDataLoader接口,如 issue#85 和 verl/RFC#2662 中讨论的那样。利用此抽象,每个 rank 可以像 PyTorch 中的DataLoader一样自动获取自己的数据。TransferQueue 系统将处理由不同并行策略引起的基础数据调度和传输逻辑,大大简化去中心化框架的设计。
🔥 案例展示
通用用法
主要交互点是 AsyncTransferQueueClient 和 TransferQueueClient,作为与 TransferQueue 系统通信的接口。
核心接口:
(async_)get_meta(data_fields: list[str], batch_size:int, global_step:int, get_n_samples:bool, task_name:str) -> BatchMeta
(async_)get_data(metadata:BatchMeta) -> TensorDict
(async_)put(data:TensorDict, metadata:BatchMeta, global_step)
(async_)clear(global_step: int)
我们即将发布详细的教程和 API 文档。
verl 示例
现在将 TransferQueue 集成到 verl 的主要动机是缓解单控制器 RayPPOTrainer 的数据传输瓶颈。目前,所有 DataProto 对象必须通过 RayPPOTrainer 路由,导致整个后训练系统存在单点瓶颈。

通过利用 TransferQueue,我们通过以下方式分离经验数据传输和元数据调度:
将
DataProto替换为BatchMeta(元数据)和TensorDict(实际数据)结构通过 BatchMeta 保留 verl 的原始调度/收集逻辑(保持单控制器调试性)
通过 TransferQueue 的分布式存储单元加速数据传输

您可以参考 recipe,其中我们在异步和同步场景中模拟了 verl 的用法。TransferQueue 已正式集成到 verl 中,现可在 verl/pulls/3649 获取(后续 PR 将进一步优化集成)。
使用 Python 包
pip install TransferQueue==0.1.1.dev2
从源代码构建 wheel 包
按照以下步骤构建并安装:
从 GitHub 仓库克隆源代码
git clone https://github.com/TransferQueue/TransferQueue/ cd TransferQueue
安装依赖项
pip install -r requirements.txt
构建并安装
python -m build --wheel pip install dist/*.whl
📊 性能
注意:上述 TransferQueue 基准测试基于我们简单的
SimpleStorageUnit后端。通过引入高性能存储后端并优化序列化/反序列化,我们预计能实现更好的性能。热烈欢迎社区贡献!
有关详细的性能基准测试,请参考 此博客。
🛠️ 自定义 TransferQueue
定义自己的数据检索逻辑
我们提供了一个 BaseSampler 抽象类,定义了以下接口:
@abstractmethod
def sample(
self,
ready_indexes: list[int],
batch_size: int,
*args: Any,
**kwargs: Any,
) -> tuple[list[int], list[int]]:
"""Sample a batch of indices from the ready indices.
Args:
ready_indexes: List of global indices for which all required fields of the
corresponding samples have been produced, and the samples are not labeled as
consumed in the corresponding task.
batch_size: Number of samples to select
*args: Additional positional arguments for specific sampler implementations
**kwargs: Additional keyword arguments for specific sampler implementations
Returns:
List of sampled global indices of length batch_size
List of global indices of length batch_size that should be labeled as consumed
(will never be retrieved in the future)
Raises:
ValueError: If batch_size is invalid or ready_indexes is insufficient
"""
raise NotImplementedError("Subclasses must implement sample")
在此设计中,我们通过两个返回值分离数据检索和数据消费,这使我们能够轻松控制样本替换。我们实现了两个参考设计:SequentialSampler 和 GRPOGroupNSampler。
Sampler 类或实例应在初始化时传递到 TransferQueueController。在每次 get_meta 调用中,您可以向 Sampler 提供动态采样参数。
from transfer_queue import TransferQueueController, TransferQueueClient, GRPOGroupNSampler, process_zmq_server_info
# 选项 1:将采样器类传递给 TransferQueueController
controller = TransferQueueController.remote(GRPOGroupNSampler)
# 选项 2:将采样器实例传递给 TransferQueueController(如果您需要自定义配置)
your_own_sampler = YourOwnSampler(config)
controller = TransferQueueController.remote(your_own_sampler)
# 使用采样器
batch_meta = client.get_meta(
data_fields=["input_ids", "attention_mask"],
batch_size=8,
partition_id="train_0",
task_name="generate_sequences",
sampling_config={"n_samples_per_prompt": 4} # 在此处放置所需的采样参数
)
如何集成新的存储后端
数据平面组织如下:
transfer_queue/
├── storage/
│ ├── __init__.py
│ │── simple_backend.py # SimpleStorageUnit、StorageUnitData、StorageMetaGroup
│ ├── managers/ # Managers are upper level interfaces that encapsulate the interaction logic with TQ system.
│ │ ├── __init__.py
│ │ ├──base.py # TransferQueueStorageManager, KVStorageManager
│ │ ├──simple_backend_manager.py # AsyncSimpleStorageManager
│ │ ├──yuanrong_manager.py # YuanrongStorageManager
│ │ ├──mooncake_manager.py # MooncakeStorageManager
│ │ └──factory.py # TransferQueueStorageManagerFactory
│ └── clients/ # Clients are lower level interfaces that directly manipulate the target storage backend.
│ │ ├── __init__.py
│ │ ├── base.py # TransferQueueStorageKVClient
│ │ ├── yuanrong_client.py # YRStorageClient
│ │ ├── mooncake_client.py # MooncakeStoreClient
│ │ └── factory.py # TransferQueueStorageClientFactory
要将自定义存储后端与 TransferQueue 集成,首先实现一个继承自 TransferQueueStorageManager 的子类。此子类充当 TransferQueue 系统与目标存储后端之间的适配器。对于基于 KV 的存储后端,您可以简单地继承 KVStorageManager,它可以作为所有基于 KV 的后端的通用管理器。
分布式存储后端通常带有自己的原生客户端,作为存储系统的接口。在此类情况下,按照 storage/clients 目录中的示例编写一个低级别适配器。
为 StorageManager 和 StorageClient 提供了工厂类,以方便集成。在工厂类中添加必要参数的描述有助于改善整体用户体验。
引用
如果您发现此仓库有用,请善意引用我们的论文:@article{han2025asyncflow,
title={AsyncFlow: An Asynchronous Streaming RL Framework for Efficient LLM Post-Training},
author={Han, Zhenyu and You, Ansheng and Wang, Haibo and Luo, Kui and Yang, Guang and Shi, Wenqi and Chen, Menglong and Zhang, Sicheng and Lan, Zeshun and Deng, Chunshi and others},
journal={arXiv preprint arXiv:2507.01663},
year={2025}
}