Reward Loop =========== .. _yyding: https://yyding1.github.io Author: `Yuyang Ding `_ Last updated: 10/23/2025. .. warning:: Reward Loop is ready for use, but the API may change in future releaes. 奖励循环(Reward Loop)旨在实现更灵活、更易用的奖励计算。 **Design goal**: - Make reward computation more efficient - Support broader reward model interface (including discriminative and generative models) - Make user customized reward function more flexible **设计目标**: - 提高奖励计算的效率 - 支持更广泛的奖励模型接口(包括判别式和生成式模型) - 让用户自定义奖励函数更加灵活 .. image:: https://github.com/yyDing1/verl-materials/blob/main/reward_loop_overview.svg?raw=true Async Reward Computation ------------------------ RewardLoopManager ~~~~~~~~~~~~~~~~~ The Reward Loop refactors the design of the reward manager so that each sample is processed asynchronously in the ``run_single`` function. This asynchronous design enables the Reward Loop to handle multiple reward computations concurrently, significantly improving computation efficiency. 异步奖励计算 ------------------------ 奖励循环管理器 ~~~~~~~~~~~~~~~~~ 奖励循环重构了奖励管理器的设计,使得每个样本都通过 ``run_single`` 函数进行异步处理。 这种异步设计让奖励循环能够并发处理多个奖励计算任务,大大提高了计算效率。 .. code:: python class RewardLoopManagerBase(ABC): async def run_single(self, data: DataProto) -> dict: # ... (data preprocessing) if self.is_async_reward_score: result = await self.compute_score( data_source=data_source, solution_str=response_str, ground_truth=ground_truth, extra_info=extra_info, reward_router_address=self.reward_router_address, reward_model_tokenizer=self.reward_model_tokenizer, ) else: result = await self.loop.run_in_executor( None, lambda: self.compute_score( data_source=data_source, solution_str=response_str, ground_truth=ground_truth, extra_info=extra_info, reward_router_address=self.reward_router_address, reward_model_tokenizer=self.reward_model_tokenizer, ), ) # ... (reward postprocessing) return final_result User-defined reward functions can be implemented as either synchronous or asynchronous. ``RewardLoopManager`` automatically detects the type of the user-defined function and executes it accordingly, ensuring that the reward computation process remains non-blocking. 用户自定义奖励函数可以实现为同步或异步两种类型。 ``RewardLoopManager`` 会自动检测用户定义函数的类型,并相应地执行,从而确保奖励计算过程是非阻塞的。 User-Customized Reward Function ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Users can define custom reward functions, for instance, by integrating external generative rewards or rule-based rewards to accommodate diverse scenario requirements. To facilitate this, the Reward Loop directly exposes the reward model interface, enabling complex reward computation pipelines that involve model-based scoring. A user-defined reward function may look like the following: 用户自定义奖励函数 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 用户可以定义自定义奖励函数,例如,通过集成外部生成式奖励或基于规则的奖励,以适应各种场景需求。 为此,奖励循环直接暴露了奖励模型接口,支持包含模型评分在内的复杂奖励计算流水线。 用户定义的奖励函数形如: .. code:: python async def compute_score_gsm8k( data_source: str, solution_str: str, ground_truth: str, extra_info: dict, reward_router_address: str, reward_model_tokenizer: PreTrainedTokenizer, ): """Compute the reward score.""" # Step 1: Prepare prompt and request payload grm_prompt = GRM_PROMPT_TEMPLATE.format(problem=extra_info["question"], solution=solution_str) messages = [{"role": "user", "content": grm_prompt}] sampling_params = {"temperature": 0.7, "top_p": 0.8, "max_tokens": 4096} chat_complete_request = {"messages": messages, **sampling_params} # Step 2: Send async request to the reward model # here, chat_complete sends async http request to the router address result = await chat_complete( router_address=reward_router_address, chat_complete_request=chat_complete_request, ) # Step 3: Parse model response and extract score grm_response = result.choices[0].message.content.strip() try: score_str = grm_response.split("\n\n")[-1].strip() score = int(score_str) except Exception: score = 0 return {"score": score} Runable examples are provided in the ``recipe/fapo`` directory for reference. 可运行示例可在 ``recipe/fapo`` 目录中找到,以供参考。 Reward Models and Router ------------------------ To support flexible and scalable reward model computation, RewardLoop implement a reward router that coordinates requests among multiple reward model servers. Each reward model runs as an independent server and is registered with the router. This router will forward the requests to the registered reward servers with load balancing and return the results. This design allows us to expose a single unified router address to user-defined reward functions, enabling them to access various reward models seamlessly through the same interface. 奖励模型和路由器 ------------------------ 为了支持灵活且可扩展的奖励模型计算,奖励循环实现了一个奖励路由器,用于协调多个奖励模型服务器之间的请求。 每个奖励模型作为独立的服务器运行,并注册到路由器中。 该路由器会通过负载均衡将请求转发到已注册的奖励服务器,并返回结果。 这种设计让我们可以向用户自定义奖励函数暴露一个统一的路由器地址,从而让他们能够通过相同的接口无缝访问各种奖励模型。 RewardModelManager ~~~~~~~~~~~~~~~~~~ .. image:: https://github.com/yyDing1/verl-materials/blob/main/reward_loop_full.svg?raw=true ``RewardModelManager`` will launch multiple reward servers and register them in the reward router. 奖励模型管理器 ~~~~~~~~~~~~~~~~~~ .. image:: https://github.com/yyDing1/verl-materials/blob/main/reward_loop_full.svg?raw=true ``RewardModelManager`` 会启动多个奖励服务器,并将它们注册到奖励路由器中。 .. code:: python class RewardModelManager: """Reward model manager.""" def __init__(self, config: RewardModelConfig, worker_group: RayWorkerGroup = None): """ Initialize the reward model manager. Args: config (RewardModelConfig): Reward model configuration. worker_group (RayWorkerGroup, optional): Worker group. Defaults to None. """ self.config = config self.worker_group = worker_group self._initialize_llm_servers() self._initialize_router() if self.config.rollout.free_cache_engine: self.sleep() Reward Router ~~~~~~~~~~~~~ The router is to forward the requests to the registered reward servers with load balancing. - For sglang reward servers, we directly use the sglang router to forward the requests. - For vllm reward servers, we implement a simple round-robin ``NaiveRouter`` to dispatch the requests. 奖励路由器 ~~~~~~~~~~~~~ 路由器的作用是使用负载均衡将请求转发到已注册的奖励服务器。 - 对于 sglang 奖励服务器,我们直接使用 sglang 路由器来转发请求。 - 对于 vllm 奖励服务器,我们实现了一个简单的循环调度 ``NaiveRouter`` 来分派请求。 .. code:: python class NaiveRouter: def __init__( self, worker_urls: list[str], max_connections: int = 1024, timeout: int = 60, max_attempts: int = 3, retry_delay: float = 2.0, verbose: bool = False, ): """A minimal async load-balancing router.""" self.verbose = verbose self.app = FastAPI() self.worker_urls = worker_urls self.request_counts = {url: 0 for url in worker_urls} self.max_connections = max_connections self.timeout = timeout self.max_attempts = max_attempts self.retry_delay = retry_delay self.app = FastAPI() # Register startup / shutdown hooks self.app.on_event("startup")(self._on_startup) self.app.on_event("shutdown")(self._on_shutdown) # Catch-all proxy route self.app.api_route("/{endpoint:path}", methods=["GET", "POST"])(self._make_async_request) # Placeholder for aiohttp client self.client = None Agent Reward Loop ----------------- Reward Loop can be integrated with AgentLoop to enable sample-wise rollout and reward computation. .. image:: https://github.com/yyDing1/verl-materials/blob/main/agent_reward_loop.svg?raw=true 智能体奖励循环 ----------------- 奖励循环可以与智能体循环(AgentLoop)集成,实现逐样本的 rollout(生成展开)和奖励计算。 .. image:: https://github.com/yyDing1/verl-materials/blob/main/agent_reward_loop.svg?raw=true