# TransferQueue 数据系统 最后更新:11/17/2025. 本文档介绍 [TransferQueue](https://github.com/TransferQueue/TransferQueue),这是一个用于高效后训练的异步流式数据管理系统。

概述

TransferQueue 是一个高性能的数据存储和传输模块,具备全景数据可见性和流式调度能力,专门优化后训练工作流中的高效数据流。

TransferQueue 提供**细粒度、样本级别**的数据管理和**负载均衡**(即将到来)能力,作为数据网关,解耦计算任务之间的显式数据依赖关系。这种设计支持分治策略,大大简化了算法控制器的设计。

更新

- **2025 年 11 月 10 日**:我们从 TransferQueueController 中分离出数据检索逻辑 [PR#101](https://github.com/TransferQueue/TransferQueue/pull/101)。现在您可以实现自己的 `Sampler` 来控制如何消费数据。 - **2025 年 11 月 5 日**:我们提供了一个 `KVStorageManager`,简化了与基于 KV 的存储后端的集成 [PR#96](https://github.com/TransferQueue/TransferQueue/pull/96)。第一个可用的基于 KV 的后端是 [Yuanrong](https://gitee.com/openeuler/yuanrong-datasystem)。 - **2025 年 11 月 4 日**:数据分区能力现已在 [PR#98](https://github.com/TransferQueue/TransferQueue/pull/98) 中可用。现在您可以定义逻辑数据分区来管理训练/验证/测试数据集。 - **2025 年 10 月 25 日**:我们通过 [PR#66](https://github.com/TransferQueue/TransferQueue/pull/66) 使存储后端可插拔。现在您可以尝试将自己的存储后端与 TransferQueue 集成! - **2025 年 10 月 21 日**:TransferQueue 已正式集成到 verl 中 [verl/pulls/3649](https://github.com/volcengine/verl/pull/3649)。后续 PR 将通过完全解耦数据流和控制流来优化单控制器架构。 - **2025 年 7 月 22 日**:我们在 Zhihu 1, 2 上发布了一系列中文博客。 - **2025 年 7 月 21 日**:我们在 verl 社区启动了一个 RFC [verl/RFC#2662](https://github.com/volcengine/verl/discussions/2662)。 - **2025 年 7 月 2 日**:我们发布了论文 [AsyncFlow](https://arxiv.org/abs/2507.01663)。

组件

### 控制平面:全景数据管理 在控制平面中,`TransferQueueController` 以元数据的形式跟踪每个训练样本的**生产状态**和**消费状态**。当所有所需的数据字段都准备就绪(即写入到 `TransferQueueStorageManager`)时,我们知道此数据样本可以被下游任务消费。 对于消费状态,我们为每个计算任务(例如 `generate_sequences`、`compute_log_prob` 等)记录消费记录。因此,即使不同计算任务需要相同的数据字段,它们也可以独立消费数据,而不会相互干扰。

为了使数据检索过程更加可定制,我们提供了一个 `Sampler` 类,允许用户定义自己的数据检索和消费逻辑。请参考 [Customize](#customize) 部分了解详情。 > 在未来,我们计划在控制平面中支持**负载均衡**和**动态批处理**能力。此外,我们将支持去中心化框架的数据管理,在这种框架中,每个 rank 自行管理数据检索,而不是由单个控制器协调。 ### 数据平面:分布式数据存储 在数据平面中,我们提供了一种可插拔设计,使 TransferQueue 能够根据用户需求集成不同的存储后端。 具体而言,我们提供了一个 `TransferQueueStorageManager` 抽象类,定义了以下核心 API: - `async def put_data(self, data: TensorDict, metadata: BatchMeta) -> None` - `async def get_data(self, metadata: BatchMeta) -> TensorDict` - `async def clear_data(self, metadata: BatchMeta) -> None` 此类封装了 TransferQueue 系统内的核心交互逻辑。您只需要编写一个简单的子类,即可集成自己的存储后端。请参考 [Customize](#customize) 部分了解详情。 目前,我们支持以下存储后端: - SimpleStorageUnit:一个基本的 CPU 内存存储,对数据格式约束 миним,使用简单。 - [Yuanrong](https://gitee.com/openeuler/yuanrong-datasystem):Ascend 原生数据系统,提供包括 HBM/DRAM/SSD 在内的分层存储接口。 - [MoonCakeStore](https://github.com/kvcache-ai/Mooncake)(进行中):一种高性能、基于 KV 的分层存储,支持 GPU 和 DRAM 之间的 RDMA 传输。 - [Ray Direct Transport](https://docs.ray.io/en/master/ray-core/direct-transport.html)([进行中](https://github.com/TransferQueue/TransferQueue/pull/108)):Ray 的新功能,允许 Ray 在 Ray actor 之间直接存储和传递对象。 其中,`SimpleStorageUnit` 作为我们的默认存储后端,由 `AsyncSimpleStorageManager` 类协调。每个存储单元可以部署在单独的节点上,实现分布式数据管理。 `SimpleStorageUnit` 采用以下 2D 数据结构: - 每一行对应一个训练样本,在相应全局批处理中分配唯一的索引。 - 每一列表示计算任务的输入/输出数据字段。 此数据结构设计受到后训练过程的计算特性启发,其中每个训练样本通过任务管道以中继方式生成。它提供精确的寻址能力,允许以流式方式进行细粒度、并发的数据读写操作。

### 用户界面:异步与同步客户端 TransferQueue 系统的交互工作流程如下: 1. 一个进程向 `TransferQueueController` 发送读取请求。 2. `TransferQueueController` 扫描每个样本(行)的生产和消费元数据,并根据负载均衡策略动态组装微批元数据。该机制启用样本级别的数据调度。 3. 该进程使用控制器提供的元数据从分布式存储单元检索实际数据。 为了简化 TransferQueue 的使用,我们将此过程封装到 `AsyncTransferQueueClient` 和 `TransferQueueClient` 中。这些客户端提供了异步和同步接口用于数据传输,允许用户轻松将 TransferQueue 集成到他们的框架中。 > 在未来,我们将为去中心化框架提供一个 `StreamingDataLoader` 接口,如 [issue#85](https://github.com/TransferQueue/TransferQueue/issues/85) 和 [verl/RFC#2662](https://github.com/volcengine/verl/discussions/2662) 中讨论的那样。利用此抽象,每个 rank 可以像 PyTorch 中的 `DataLoader` 一样自动获取自己的数据。TransferQueue 系统将处理由不同并行策略引起的基础数据调度和传输逻辑,大大简化去中心化框架的设计。

🔥 案例展示

### 通用用法 主要交互点是 `AsyncTransferQueueClient` 和 `TransferQueueClient`,作为与 TransferQueue 系统通信的接口。 核心接口: - (async_)get_meta(data_fields: list[str], batch_size:int, global_step:int, get_n_samples:bool, task_name:str) -> BatchMeta - (async_)get_data(metadata:BatchMeta) -> TensorDict - (async_)put(data:TensorDict, metadata:BatchMeta, global_step) - (async_)clear(global_step: int) 我们即将发布详细的教程和 API 文档。 ### verl 示例 现在将 TransferQueue 集成到 verl 的主要动机是**缓解单控制器 `RayPPOTrainer` 的数据传输瓶颈**。目前,所有 `DataProto` 对象必须通过 `RayPPOTrainer` 路由,导致整个后训练系统存在单点瓶颈。 ![verl_dataflow_DataProto](https://github.com/TransferQueue/community_doc/blob/main/docs/verl_workflow.jpeg?raw=true) 通过利用 TransferQueue,我们通过以下方式分离经验数据传输和元数据调度: - 将 `DataProto` 替换为 `BatchMeta`(元数据)和 `TensorDict`(实际数据)结构 - 通过 BatchMeta 保留 verl 的原始调度/收集逻辑(保持单控制器调试性) - 通过 TransferQueue 的分布式存储单元加速数据传输 ![verl_dataflow_TransferQueue](https://github.com/TransferQueue/community_doc/blob/main/docs/verl_workflow_with_tq.jpeg?raw=true) 您可以参考 [recipe](https://github.com/TransferQueue/TransferQueue/tree/dev/recipe/simple_use_case),其中我们在异步和同步场景中模拟了 verl 的用法。TransferQueue 已正式集成到 verl 中,现可在 [verl/pulls/3649](https://github.com/volcengine/verl/pull/3649) 获取(后续 PR 将进一步优化集成)。 ### 使用 Python 包 ```bash pip install TransferQueue==0.1.1.dev2 ``` ### 从源代码构建 wheel 包 按照以下步骤构建并安装: 1. 从 GitHub 仓库克隆源代码 ```bash git clone https://github.com/TransferQueue/TransferQueue/ cd TransferQueue ``` 2. 安装依赖项 ```bash pip install -r requirements.txt ``` 3. 构建并安装 ```bash python -m build --wheel pip install dist/*.whl ```

📊 性能

> 注意:上述 TransferQueue 基准测试基于我们简单的 `SimpleStorageUnit` 后端。通过引入高性能存储后端并优化序列化/反序列化,我们预计能实现更好的性能。热烈欢迎社区贡献! 有关详细的性能基准测试,请参考 [此博客](https://www.yuque.com/haomingzi-lfse7/hlx5g0/tml8ke0zkgn6roey?singleDoc#)。

🛠️ 自定义 TransferQueue

### 定义自己的数据检索逻辑 我们提供了一个 `BaseSampler` 抽象类,定义了以下接口: ```python3 @abstractmethod def sample( self, ready_indexes: list[int], batch_size: int, *args: Any, **kwargs: Any, ) -> tuple[list[int], list[int]]: """Sample a batch of indices from the ready indices. Args: ready_indexes: List of global indices for which all required fields of the corresponding samples have been produced, and the samples are not labeled as consumed in the corresponding task. batch_size: Number of samples to select *args: Additional positional arguments for specific sampler implementations **kwargs: Additional keyword arguments for specific sampler implementations Returns: List of sampled global indices of length batch_size List of global indices of length batch_size that should be labeled as consumed (will never be retrieved in the future) Raises: ValueError: If batch_size is invalid or ready_indexes is insufficient """ raise NotImplementedError("Subclasses must implement sample") ``` 在此设计中,我们通过两个返回值分离数据检索和数据消费,这使我们能够轻松控制样本替换。我们实现了两个参考设计:`SequentialSampler` 和 `GRPOGroupNSampler`。 `Sampler` 类或实例应在初始化时传递到 `TransferQueueController`。在每次 `get_meta` 调用中,您可以向 `Sampler` 提供动态采样参数。 ```python3 from transfer_queue import TransferQueueController, TransferQueueClient, GRPOGroupNSampler, process_zmq_server_info # 选项 1:将采样器类传递给 TransferQueueController controller = TransferQueueController.remote(GRPOGroupNSampler) # 选项 2:将采样器实例传递给 TransferQueueController(如果您需要自定义配置) your_own_sampler = YourOwnSampler(config) controller = TransferQueueController.remote(your_own_sampler) # 使用采样器 batch_meta = client.get_meta( data_fields=["input_ids", "attention_mask"], batch_size=8, partition_id="train_0", task_name="generate_sequences", sampling_config={"n_samples_per_prompt": 4} # 在此处放置所需的采样参数 ) ``` ### 如何集成新的存储后端 数据平面组织如下: ```text transfer_queue/ ├── storage/ │ ├── __init__.py │ │── simple_backend.py # SimpleStorageUnit、StorageUnitData、StorageMetaGroup │ ├── managers/ # Managers are upper level interfaces that encapsulate the interaction logic with TQ system. │ │ ├── __init__.py │ │ ├──base.py # TransferQueueStorageManager, KVStorageManager │ │ ├──simple_backend_manager.py # AsyncSimpleStorageManager │ │ ├──yuanrong_manager.py # YuanrongStorageManager │ │ ├──mooncake_manager.py # MooncakeStorageManager │ │ └──factory.py # TransferQueueStorageManagerFactory │ └── clients/ # Clients are lower level interfaces that directly manipulate the target storage backend. │ │ ├── __init__.py │ │ ├── base.py # TransferQueueStorageKVClient │ │ ├── yuanrong_client.py # YRStorageClient │ │ ├── mooncake_client.py # MooncakeStoreClient │ │ └── factory.py # TransferQueueStorageClientFactory ``` 要将自定义存储后端与 TransferQueue 集成,首先实现一个继承自 `TransferQueueStorageManager` 的子类。此子类充当 TransferQueue 系统与目标存储后端之间的适配器。对于基于 KV 的存储后端,您可以简单地继承 `KVStorageManager`,它可以作为所有基于 KV 的后端的通用管理器。 分布式存储后端通常带有自己的原生客户端,作为存储系统的接口。在此类情况下,按照 `storage/clients` 目录中的示例编写一个低级别适配器。 为 `StorageManager` 和 `StorageClient` 提供了工厂类,以方便集成。在工厂类中添加必要参数的描述有助于改善整体用户体验。

引用

如果您发现此仓库有用,请善意引用我们的论文: ```bibtex @article{han2025asyncflow, title={AsyncFlow: An Asynchronous Streaming RL Framework for Efficient LLM Post-Training}, author={Han, Zhenyu and You, Ansheng and Wang, Haibo and Luo, Kui and Yang, Guang and Shi, Wenqi and Chen, Menglong and Zhang, Sicheng and Lan, Zeshun and Deng, Chunshi and others}, journal={arXiv preprint arXiv:2507.01663}, year={2025} } ```