沙箱融合工具集成

最后更新:06/10/2025。

动机

  • 作为 verl 的用户,我们希望允许模型在 Actor 展开期间调用某些工具,并将结果纳入训练过程。

  • 一位来自 ByteDance 的同事提出了一篇论文,旨在通过代码执行工具提升模型能力。

  • 我们旨在支持推理引擎的工具调用能力,使用 sandbox-fusion 作为代码执行系统,为社区提供 retools 的重新实现版本。

使用沙箱融合 + FaaS 集成进行奖励计算

  • 在当前的 数据集和任务中,类似工作已存在(例如 Prime),它使用本地进程作为运行器来执行模型生成的代码以进行奖励计算。

  • 在此基础上,#1429 通过集成 FaaS 作为奖励计算的运行器推进了设计。

目标

  • 适配 sglang 工具调用协议,并为沙箱融合定义工具。

  • async-rollout 过程集成,确保沙箱融合工具遵循 asyncIO 约定。

  • 设计并实现一个基本的速率限制器,以防止诸如 429 错误等问题。

非目标

  • 训练效果不在范围内。

  • 观测性指标不予考虑。

  • 分布式故障转移和组件容错不予处理。

设计细节

工具 Schema 定义

  • 目前仅考虑代码执行,需要模型的 JSON 中有 code 字段。

  • 目前仅支持 Python 代码,因此未定义 language 参数。

OpenAIFunctionToolSchema(
    type="function",
    function=OpenAIFunctionSchema(
        name="code_interpreter",
        description="用于执行代码的工具。",
        parameters=OpenAIFunctionParametersSchema(
            type="object",
            properties={
                "code": OpenAIFunctionPropertySchema(
                    type="string",
                    description="要执行的代码。",
                    enum=None,
                )
            },
            required=["code"],
        ),
        strict=False,
    )
)

配置参数

速率限制设计

目标:

  • 使用 token bucket 模型限制进行中的请求数量。

  • 确保有序提交到代码运行器,以避免因退避而导致的饥饿。

设计亮点:

  • 使用 Ray 全局 Actor 作为集群级别的单例分布式计数器。

  • 使用信号量进行计数,在单独的线程池中使用 acquirerelease 以保持顺序。

  • 使用 Ray 的 cloud-pickle 序列化函数,以便解耦 ExecutionWorker

@ray.remote(concurrency_groups={"acquire": 1,"release": 10})
class TokenBucketWorker:
    def __init__(self, rate_limit: int):
        self.rate_limit = rate_limit
        self.current_count = 0
        self._semaphore = threading.Semaphore(rate_limit)

    @ray.method(concurrency_group="acquire")
    def acquire(self):
        self._semaphore.acquire()
        self.current_count += 1

    @ray.method(concurrency_group="release")
    def release(self):
        self._semaphore.release()
        self.current_count -= 1

    def get_current_count(self):
        return self.current_count

class ExecutionWorker:
    def __init__(self, enable_global_rate_limit=True, rate_limit=10):
        self.rate_limit_worker = self._init_rate_limit(rate_limit) if enable_global_rate_limit else None

    def _init_rate_limit(self, rate_limit):
        return TokenBucketWorker.options(name="rate-limiter", get_if_exists=True).remote(rate_limit)

    def execute(self, fn: Callable[..., T], *fn_args, **fn_kwargs) -> T:
        with ExitStack() as stack:
            stack.callback(self.rate_limit_worker.release.remote)
            ray.get(self.rate_limit_worker.acquire.remote())
            try:
                return fn(*fn_args, **fn_kwargs)
            except Exception as e:
                logger.warning(f"Error when executing code: {e}")

def init_execution_pool(num_workers: int, enable_global_rate_limit=True, rate_limit=10, mode: PoolMode=PoolMode.ThreadMode):
    if mode == PoolMode.ThreadMode:
        return ray.remote(ExecutionWorker).options(max_concurrency=num_workers).remote(
            enable_global_rate_limit=enable_global_rate_limit,
            rate_limit=rate_limit
        )
    else:
        raise NotImplementedError("Process mode is not implemented yet")

工具实现

  • 使用 instance_id 来识别跨多个对话轮次的请求。

  • 使用 execution_pool 来实现异步调用。

  • 在展开完成后清理状态。

class SandboxFusionTool(BaseTool):
    def __init__(self, config: dict, tool_schema: OpenAIFunctionToolSchema):
        ...
        self.execution_pool = init_execution_pool(...)
        ...

    async def create(self, instance_id: Optional[str] = None, ...):
        ...

     async def execute(self, instance_id: str, parameters: dict[str, Any], **kwargs) -> Tuple[str, float, dict]:
         code = parameters.get("code", "")
         timeout = parameters.get("timeout", self.default_timeout)
         language = parameters.get("language", self.default_language)
         if not isinstance(code, str):
             code = str(code)

         result = await self.execution_pool.execute.remote(self.execute_code,instance_id,code,timeout,language)
         self._instance_dict[instance_id]["reward"].append(result.strip())

         return result, result, {}

     def execute_code(self,instance_id,code,timeout=30,language="python"):
         result_status, metadata  = _process_single_case(0, None, None,self.sandbox_fusion_url, code, timeout, language)
         # we should always expect this since we don't have correct answer
         if metadata["run_status"] == "Finished":
             actual_output = metadata["stdout"] if metadata["stdout"] is not None else ""
             return actual_output
         else:
             return "no stdout here"

    async def calc_reward(self, instance_id: str, ...):
        ...

    async def release(self, instance_id: str, ...):
        ...

测试计划

单元测试

  • test_tools_registration:测试工具注册和初始化。

  • test_rollout_req_creation:验证 AsyncRolloutReq 是否正确构建。

  • test_over_size_case:确保在超出 max_seq_len 时提前终止展开。

  • test_tool_call_basic_case:模拟 sglang 输出,验证工具调用和结果。

  • test_tool_call_batch_case:测试工具调用的批量处理。

  • test_basic_multi_process_init:验证 Ray 全局 actor 是否作为单例运行。

  • TestSingleNodeRateLimiterCase:验证速率限制器在单节点模式下工作。

  • test_rotten_execution:确保速率限制器从函数错误中恢复。

  • TestMultiNodeRateLimiterCase:验证在多节点环境中的行为。

端到端测试

我们提供了端到端测试脚本,位于 tests/special_e2e 文件夹中,名为 tests/special_e2e/run_gsm8k_fsdp_sgl_multiturn_sf_tool.sh

通过设置 ‘trainer.rollout_data_dir’ 可以将展开数据转储到本地磁盘。以下是从展开数据中提取的一个示例:

{
  "input": "

  system\nYou are a math expert. You are given a question and you need to solve it step by step. Reasoning step by step before any tool call. You should use the `calc_gsm8k_reward` tool after step by step solving the question, before generate final answer at least once and refine your answer if necessary. Put your final answer in the format of `#### <answer>`.\n\n# Tools\n\nYou may call one or more functions to assist with the user query.\n\nYou are provided with function signatures within <tools></tools> XML tags:\n<tools>\n{\"type\": \"function\", \"function\": {\"name\": \"code_interpreter\", \"description\": \"A tool for executing code.\", \"parameters\": {\"type\": \"object\", \"properties\": {\"code\": {\"type\": \"string\", \"description\": \"The code to execute.\", \"enum\": null}}, \"required\": [\"code\"]}, \"strict\": false}}\n</tools>\n\nFor each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags:\n<tool_call>\n{\"name\": <function-name>, \"arguments\": <args-json-object>}\n</tool_call>\n

  user\nJohn gets a bonus that's the same percentage every year.  Last year he made $100,000 and got a $10,000 bonus.  This year he makes $200,000.  How much will John make this year when adding both his total pay and bonus together? Let's think step by step and output the final answer after `####`.\n

  assistant\n",
  "output": "assistant\n<think>\nOkay, let's see. John gets a bonus that's the same percentage every year. Last year, he made $100,000 and got a $10,000 bonus. This year, he makes $200,000. The question is asking how much he'll make this year when adding both his total pay and bonus together.\n\nFirst, I need to figure out the percentage bonus he gets. Since the bonus is the same percentage each year, if I can find that percentage based on last year's details, I can apply it to this year's earnings to find this year's bonus. Then, I'll add that bonus to his total pay this year.\n\nLast year, he earned $100,000 and got a $10,000 bonus. To find the percentage, I can divide the bonus by his total earnings. So, 10,000 divided by 100,000. Let me calculate that.\n\n10,000 / 100,000 equals 0.1, which is 10% as a percentage. So, his bonus is 10% of his pay each year.\n\nThis year, he makes $200,000. So, the bonus this year would be 10% of 200,000. Let me compute that using the code interpreter to ensure accuracy.\n

  <tool_call>\n{\"name\": \"code_interpreter\", \"arguments\": {\"code\": \"total_pay_this_year = 200000\\nbonus_percentage = 10 / 100\\nbonus_this_year = total_pay_this_year * bonus_percentage\\ntotal_income_this_year = total_pay_this_year + bonus_this_year\\nprint(total_income_this_year)\", \"executes\": \"True\"}}\n</tool_call>\n

  tool\n220000.0\n\n

  assistant\n<think>\nSo the code shows that John's total income this year would be $220,000. This is consistent with the 10% bonus on last year's pay of $100,000, which was $10,000. Therefore, this year's bonus of $20,000 added to his total pay of $200,000 results in a total income of $220,000.\n\nI need to verify that there are no other factors at play. The problem states it's a constant percentage bonus each year, so there's no mention of changing percentages or other variables. Therefore, applying a straightforward percentage of 10% on this year's partial salary amount would result in $20,000. Adding that to his total pay gives $220,000. The calculation is correct and aligns with the question's parameters.\n</think>\n\n#### 220000.0",
  "score": 0,
  "step": 1
}

这是可读格式版本:

[system]

You are a math expert. You are given a question and you need to solve it step by step. Reasoning step by step before any tool call. You should use the `calc_gsm8k_reward` tool after step by step solving the question, before generate final answer at least once and refine your answer if necessary. Put your final answer in the format of `#### <answer>`.

# Tools

You may call one or more functions to assist with the user query.

You are provided with function signatures within <tools></tools> XML tags:
<tools>
{"type": "function", "function": {"name": "code_interpreter", "description": "A tool for executing code.", "parameters": {"type": "object", "properties": {"code": {"type": "string", "description": "The code to execute.", "enum": null}}, "required": ["code"]}, "strict": false}}
</tools>

For each function call, return a json object with function name and arguments within <tool_call></tool_call> XML tags:
<tool_call>
{"name": <function-name>, "arguments": <args-json-object>}
</tool_call>

[user]

John gets a bonus that's the same percentage every year.  Last year he made $100,000 and got a $10,000 bonus.  This year he makes $200,000.  How much will John make this year when adding both his total pay and bonus together? Let's think step by step and output the final answer after `####`.

[assistant]

<think>
Okay, let's see. John gets a bonus that's the same percentage every year. Last year, he made $100,000 and got a $10,000 bonus. This year, he makes $200,000. The question is asking how much he'll make this year when adding both his total pay and bonus together.

First, I need to figure out the percentage bonus he gets. Since the bonus is the same percentage each year, if I can find that percentage based on last year's details, I can apply it to this year's earnings to find this year's bonus. Then, I'll add that bonus to his total pay this year.

Last year, he earned $100,000 and got a $10,000 bonus. To find the percentage, I can divide the bonus by his total earnings. So, 10,000 divided by 100,000. Let me calculate that.

10,000 / 100,000 equals 0.1, which is 10% as a percentage. So, his bonus is 10% of his pay each year.

This year, he makes $200,000. So, the bonus this year would be 10% of 200,000. Let me compute that using the code interpreter to ensure accuracy.
<tool_call>
{"name": "code_interpreter", "arguments": {"code": "total_pay_this_year = 200000\nbonus_percentage = 10 / 100\nbonus_this_year = total_pay_this_year * bonus_percentage\ntotal_income_this_year = total_pay_this_year + bonus_this_year\nprint(total_income_this_year)", "executes": "True"}}
</tool_call>

[tool]

220000.0

[assistant]

<think>
So the code shows that John's total income this year would be $220,000. This is consistent with the 10% bonus on last year's pay of $100,000, which was $10,000. Therefore, this year's bonus of $20,000 added to his total pay of $200,000 results in a total income of $220,000.

I need to verify that there are no other factors at play. The problem states it's a constant percentage bonus each year, so there's no mention of changing percentages or other variables. Therefore, applying a straightforward percentage of 10% on this year's partial salary amount would result in $20,000. Adding that to his total pay gives $220,000. The calculation is correct and aligns with the question's parameters.
</think>

#### 220000.0

您还可以使用 RolloutViewer TUI 工具查看转储的展开数据:

python scripts/rollout_viewer.py ${trainer.rollout_data_dir}
RolloutViewer screenshot