# TransferQueue 数据系统
最后更新:11/17/2025.
本文档介绍 [TransferQueue](https://github.com/TransferQueue/TransferQueue),这是一个用于高效后训练的异步流式数据管理系统。
概述
TransferQueue 是一个高性能的数据存储和传输模块,具备全景数据可见性和流式调度能力,专门优化后训练工作流中的高效数据流。
TransferQueue 提供**细粒度、样本级别**的数据管理和**负载均衡**(即将到来)能力,作为数据网关,解耦计算任务之间的显式数据依赖关系。这种设计支持分治策略,大大简化了算法控制器的设计。
更新
- **2025 年 11 月 10 日**:我们从 TransferQueueController 中分离出数据检索逻辑 [PR#101](https://github.com/TransferQueue/TransferQueue/pull/101)。现在您可以实现自己的 `Sampler` 来控制如何消费数据。
- **2025 年 11 月 5 日**:我们提供了一个 `KVStorageManager`,简化了与基于 KV 的存储后端的集成 [PR#96](https://github.com/TransferQueue/TransferQueue/pull/96)。第一个可用的基于 KV 的后端是 [Yuanrong](https://gitee.com/openeuler/yuanrong-datasystem)。
- **2025 年 11 月 4 日**:数据分区能力现已在 [PR#98](https://github.com/TransferQueue/TransferQueue/pull/98) 中可用。现在您可以定义逻辑数据分区来管理训练/验证/测试数据集。
- **2025 年 10 月 25 日**:我们通过 [PR#66](https://github.com/TransferQueue/TransferQueue/pull/66) 使存储后端可插拔。现在您可以尝试将自己的存储后端与 TransferQueue 集成!
- **2025 年 10 月 21 日**:TransferQueue 已正式集成到 verl 中 [verl/pulls/3649](https://github.com/volcengine/verl/pull/3649)。后续 PR 将通过完全解耦数据流和控制流来优化单控制器架构。
- **2025 年 7 月 22 日**:我们在 Zhihu 1, 2 上发布了一系列中文博客。
- **2025 年 7 月 21 日**:我们在 verl 社区启动了一个 RFC [verl/RFC#2662](https://github.com/volcengine/verl/discussions/2662)。
- **2025 年 7 月 2 日**:我们发布了论文 [AsyncFlow](https://arxiv.org/abs/2507.01663)。
组件
### 控制平面:全景数据管理
在控制平面中,`TransferQueueController` 以元数据的形式跟踪每个训练样本的**生产状态**和**消费状态**。当所有所需的数据字段都准备就绪(即写入到 `TransferQueueStorageManager`)时,我们知道此数据样本可以被下游任务消费。
对于消费状态,我们为每个计算任务(例如 `generate_sequences`、`compute_log_prob` 等)记录消费记录。因此,即使不同计算任务需要相同的数据字段,它们也可以独立消费数据,而不会相互干扰。
为了使数据检索过程更加可定制,我们提供了一个 `Sampler` 类,允许用户定义自己的数据检索和消费逻辑。请参考 [Customize](#customize) 部分了解详情。
> 在未来,我们计划在控制平面中支持**负载均衡**和**动态批处理**能力。此外,我们将支持去中心化框架的数据管理,在这种框架中,每个 rank 自行管理数据检索,而不是由单个控制器协调。
### 数据平面:分布式数据存储
在数据平面中,我们提供了一种可插拔设计,使 TransferQueue 能够根据用户需求集成不同的存储后端。
具体而言,我们提供了一个 `TransferQueueStorageManager` 抽象类,定义了以下核心 API:
- `async def put_data(self, data: TensorDict, metadata: BatchMeta) -> None`
- `async def get_data(self, metadata: BatchMeta) -> TensorDict`
- `async def clear_data(self, metadata: BatchMeta) -> None`
此类封装了 TransferQueue 系统内的核心交互逻辑。您只需要编写一个简单的子类,即可集成自己的存储后端。请参考 [Customize](#customize) 部分了解详情。
目前,我们支持以下存储后端:
- SimpleStorageUnit:一个基本的 CPU 内存存储,对数据格式约束 миним,使用简单。
- [Yuanrong](https://gitee.com/openeuler/yuanrong-datasystem):Ascend 原生数据系统,提供包括 HBM/DRAM/SSD 在内的分层存储接口。
- [MoonCakeStore](https://github.com/kvcache-ai/Mooncake)(进行中):一种高性能、基于 KV 的分层存储,支持 GPU 和 DRAM 之间的 RDMA 传输。
- [Ray Direct Transport](https://docs.ray.io/en/master/ray-core/direct-transport.html)([进行中](https://github.com/TransferQueue/TransferQueue/pull/108)):Ray 的新功能,允许 Ray 在 Ray actor 之间直接存储和传递对象。
其中,`SimpleStorageUnit` 作为我们的默认存储后端,由 `AsyncSimpleStorageManager` 类协调。每个存储单元可以部署在单独的节点上,实现分布式数据管理。
`SimpleStorageUnit` 采用以下 2D 数据结构:
- 每一行对应一个训练样本,在相应全局批处理中分配唯一的索引。
- 每一列表示计算任务的输入/输出数据字段。
此数据结构设计受到后训练过程的计算特性启发,其中每个训练样本通过任务管道以中继方式生成。它提供精确的寻址能力,允许以流式方式进行细粒度、并发的数据读写操作。
### 用户界面:异步与同步客户端
TransferQueue 系统的交互工作流程如下:
1. 一个进程向 `TransferQueueController` 发送读取请求。
2. `TransferQueueController` 扫描每个样本(行)的生产和消费元数据,并根据负载均衡策略动态组装微批元数据。该机制启用样本级别的数据调度。
3. 该进程使用控制器提供的元数据从分布式存储单元检索实际数据。
为了简化 TransferQueue 的使用,我们将此过程封装到 `AsyncTransferQueueClient` 和 `TransferQueueClient` 中。这些客户端提供了异步和同步接口用于数据传输,允许用户轻松将 TransferQueue 集成到他们的框架中。
> 在未来,我们将为去中心化框架提供一个 `StreamingDataLoader` 接口,如 [issue#85](https://github.com/TransferQueue/TransferQueue/issues/85) 和 [verl/RFC#2662](https://github.com/volcengine/verl/discussions/2662) 中讨论的那样。利用此抽象,每个 rank 可以像 PyTorch 中的 `DataLoader` 一样自动获取自己的数据。TransferQueue 系统将处理由不同并行策略引起的基础数据调度和传输逻辑,大大简化去中心化框架的设计。
🔥 案例展示
### 通用用法
主要交互点是 `AsyncTransferQueueClient` 和 `TransferQueueClient`,作为与 TransferQueue 系统通信的接口。
核心接口:
- (async_)get_meta(data_fields: list[str], batch_size:int, global_step:int, get_n_samples:bool, task_name:str) -> BatchMeta
- (async_)get_data(metadata:BatchMeta) -> TensorDict
- (async_)put(data:TensorDict, metadata:BatchMeta, global_step)
- (async_)clear(global_step: int)
我们即将发布详细的教程和 API 文档。
### verl 示例
现在将 TransferQueue 集成到 verl 的主要动机是**缓解单控制器 `RayPPOTrainer` 的数据传输瓶颈**。目前,所有 `DataProto` 对象必须通过 `RayPPOTrainer` 路由,导致整个后训练系统存在单点瓶颈。

通过利用 TransferQueue,我们通过以下方式分离经验数据传输和元数据调度:
- 将 `DataProto` 替换为 `BatchMeta`(元数据)和 `TensorDict`(实际数据)结构
- 通过 BatchMeta 保留 verl 的原始调度/收集逻辑(保持单控制器调试性)
- 通过 TransferQueue 的分布式存储单元加速数据传输

您可以参考 [recipe](https://github.com/TransferQueue/TransferQueue/tree/dev/recipe/simple_use_case),其中我们在异步和同步场景中模拟了 verl 的用法。TransferQueue 已正式集成到 verl 中,现可在 [verl/pulls/3649](https://github.com/volcengine/verl/pull/3649) 获取(后续 PR 将进一步优化集成)。
### 使用 Python 包
```bash
pip install TransferQueue==0.1.1.dev2
```
### 从源代码构建 wheel 包
按照以下步骤构建并安装:
1. 从 GitHub 仓库克隆源代码
```bash
git clone https://github.com/TransferQueue/TransferQueue/
cd TransferQueue
```
2. 安装依赖项
```bash
pip install -r requirements.txt
```
3. 构建并安装
```bash
python -m build --wheel
pip install dist/*.whl
```
> 注意:上述 TransferQueue 基准测试基于我们简单的 `SimpleStorageUnit` 后端。通过引入高性能存储后端并优化序列化/反序列化,我们预计能实现更好的性能。热烈欢迎社区贡献!
有关详细的性能基准测试,请参考 [此博客](https://www.yuque.com/haomingzi-lfse7/hlx5g0/tml8ke0zkgn6roey?singleDoc#)。
🛠️ 自定义 TransferQueue
### 定义自己的数据检索逻辑
我们提供了一个 `BaseSampler` 抽象类,定义了以下接口:
```python3
@abstractmethod
def sample(
self,
ready_indexes: list[int],
batch_size: int,
*args: Any,
**kwargs: Any,
) -> tuple[list[int], list[int]]:
"""Sample a batch of indices from the ready indices.
Args:
ready_indexes: List of global indices for which all required fields of the
corresponding samples have been produced, and the samples are not labeled as
consumed in the corresponding task.
batch_size: Number of samples to select
*args: Additional positional arguments for specific sampler implementations
**kwargs: Additional keyword arguments for specific sampler implementations
Returns:
List of sampled global indices of length batch_size
List of global indices of length batch_size that should be labeled as consumed
(will never be retrieved in the future)
Raises:
ValueError: If batch_size is invalid or ready_indexes is insufficient
"""
raise NotImplementedError("Subclasses must implement sample")
```
在此设计中,我们通过两个返回值分离数据检索和数据消费,这使我们能够轻松控制样本替换。我们实现了两个参考设计:`SequentialSampler` 和 `GRPOGroupNSampler`。
`Sampler` 类或实例应在初始化时传递到 `TransferQueueController`。在每次 `get_meta` 调用中,您可以向 `Sampler` 提供动态采样参数。
```python3
from transfer_queue import TransferQueueController, TransferQueueClient, GRPOGroupNSampler, process_zmq_server_info
# 选项 1:将采样器类传递给 TransferQueueController
controller = TransferQueueController.remote(GRPOGroupNSampler)
# 选项 2:将采样器实例传递给 TransferQueueController(如果您需要自定义配置)
your_own_sampler = YourOwnSampler(config)
controller = TransferQueueController.remote(your_own_sampler)
# 使用采样器
batch_meta = client.get_meta(
data_fields=["input_ids", "attention_mask"],
batch_size=8,
partition_id="train_0",
task_name="generate_sequences",
sampling_config={"n_samples_per_prompt": 4} # 在此处放置所需的采样参数
)
```
### 如何集成新的存储后端
数据平面组织如下:
```text
transfer_queue/
├── storage/
│ ├── __init__.py
│ │── simple_backend.py # SimpleStorageUnit、StorageUnitData、StorageMetaGroup
│ ├── managers/ # Managers are upper level interfaces that encapsulate the interaction logic with TQ system.
│ │ ├── __init__.py
│ │ ├──base.py # TransferQueueStorageManager, KVStorageManager
│ │ ├──simple_backend_manager.py # AsyncSimpleStorageManager
│ │ ├──yuanrong_manager.py # YuanrongStorageManager
│ │ ├──mooncake_manager.py # MooncakeStorageManager
│ │ └──factory.py # TransferQueueStorageManagerFactory
│ └── clients/ # Clients are lower level interfaces that directly manipulate the target storage backend.
│ │ ├── __init__.py
│ │ ├── base.py # TransferQueueStorageKVClient
│ │ ├── yuanrong_client.py # YRStorageClient
│ │ ├── mooncake_client.py # MooncakeStoreClient
│ │ └── factory.py # TransferQueueStorageClientFactory
```
要将自定义存储后端与 TransferQueue 集成,首先实现一个继承自 `TransferQueueStorageManager` 的子类。此子类充当 TransferQueue 系统与目标存储后端之间的适配器。对于基于 KV 的存储后端,您可以简单地继承 `KVStorageManager`,它可以作为所有基于 KV 的后端的通用管理器。
分布式存储后端通常带有自己的原生客户端,作为存储系统的接口。在此类情况下,按照 `storage/clients` 目录中的示例编写一个低级别适配器。
为 `StorageManager` 和 `StorageClient` 提供了工厂类,以方便集成。在工厂类中添加必要参数的描述有助于改善整体用户体验。
引用
如果您发现此仓库有用,请善意引用我们的论文:
```bibtex
@article{han2025asyncflow,
title={AsyncFlow: An Asynchronous Streaming RL Framework for Efficient LLM Post-Training},
author={Han, Zhenyu and You, Ansheng and Wang, Haibo and Luo, Kui and Yang, Guang and Shi, Wenqi and Chen, Menglong and Zhang, Sicheng and Lan, Zeshun and Deng, Chunshi and others},
journal={arXiv preprint arXiv:2507.01663},
year={2025}
}
```