Interaction System for Multi-turn RL Training ============================================= Last updated: 06/25/2025. Overview -------- The verl interaction system enables dynamic, multi-turn conversational feedback during reinforcement learning training. This system allows models to engage in iterative problem-solving scenarios where interaction agents can provide corrective feedback, guidance, or evaluation based on the model's responses. verl 交互系统支持在强化学习训练过程中实现动态、多轮对话反馈。该系统允许模型参与迭代式问题解决场景,其中交互代理可以根据模型的响应提供纠正性反馈、指导或评估。 **New in Multi-Interaction Support**: The system now supports multiple named interactions within a single training session, enabling sophisticated training scenarios where different samples can use different interaction strategies. This allows for curriculum learning, domain-specific feedback, and flexible agent switching at the sample level. **多交互支持中的新特性**:系统现在支持在单个训练会话中进行多个命名交互,这使得不同样本可以使用不同的交互策略,从而实现复杂的训练场景,包括课程学习(curriculum learning)、特定领域反馈以及样本级别的灵活代理切换。 Key features: - **Async-based Architecture**: Non-blocking interaction processing for distributed training - **Instance Management**: Stateful session handling with unique instance IDs for concurrent interactions - **SGLang Integration**: Seamless integration with SGLang rollout system for multi-turn conversations - **Configuration-driven**: Dynamic agent loading via YAML configuration files - **Multi-Interaction Support**: Registry system enabling multiple named interactions per rollout - **Sample-Level Selection**: Each sample can specify which interaction to use via configuration - **Reward Integration**: Turn-level scoring mechanism integrated with verl's reward system 关键特性: - **基于异步的架构**:为分布式训练提供非阻塞式交互处理 - **实例管理**:使用唯一实例 ID 处理有状态会话,支持并发交互 - **SGLang 集成**:与 SGLang rollout 系统无缝集成,支持多轮对话 - **配置驱动**:通过 YAML 配置文件动态加载代理 - **多交互支持**:注册表系统支持单次 rollout 中的多个命名交互 - **样本级别选择**:每个样本可通过配置指定使用哪种交互 - **奖励集成**:轮级别评分机制,与 verl 的奖励系统集成 Architecture ------------ The interaction system follows a plugin-based architecture with clear separation of concerns: .. code-block:: Interaction Registry System ↓ BaseInteraction (Abstract Interface) ↓ Multiple Named Interactions (e.g., Gsm8kInteraction, CustomInteraction) ↓ SGLang Rollout Integration (interaction_map) ↓ Sample-Level Interaction Selection ↓ Async Request Lifecycle Management 交互系统采用基于插件的架构,职责分离清晰: .. code-block:: Interaction Registry System ↓ BaseInteraction (Abstract Interface) ↓ Multiple Named Interactions (e.g., Gsm8kInteraction, CustomInteraction) ↓ SGLang Rollout Integration (interaction_map) ↓ Sample-Level Interaction Selection ↓ Async Request Lifecycle Management Core Components ~~~~~~~~~~~~~~~ **Interaction Registry System** The interaction registry system allows loading and managing multiple named interactions: 交互注册表系统允许加载和管理多个命名交互: .. code-block:: python from verl.interactions.utils.interaction_registry import initialize_interactions_from_config # Load multiple interactions from config interaction_map = initialize_interactions_from_config("config.yaml") # Access specific interaction by name gsm8k_interaction = interaction_map["gsm8k"] custom_interaction = interaction_map["custom_solver"] .. code-block:: python from verl.interactions.utils.interaction_registry import initialize_interactions_from_config # Load multiple interactions from config interaction_map = initialize_interactions_from_config("config.yaml") # Access specific interaction by name gsm8k_interaction = interaction_map["gsm8k"] custom_interaction = interaction_map["custom_solver"] **BaseInteraction Interface** All interaction agents must implement the ``BaseInteraction`` abstract class: 所有交互代理必须实现 ``BaseInteraction`` 抽象类: .. code-block:: python from verl.interactions.base import BaseInteraction from typing import Dict, Any, List, Tuple, Optional class BaseInteraction: def __init__(self, config: Dict[str, Any]): self.config = config self.name: str = config.get("name", "interaction_agent") async def start_interaction(self, instance_id: Optional[str] = None, **kwargs) -> str: """Initialize interaction session, return instance_id""" async def generate_response(self, instance_id: str, messages: List[Dict[str, Any]], **kwargs) -> Tuple[bool, str, float, Dict[str, Any]]: """Generate response, return (should_terminate, response, score, metadata)""" async def calculate_score(self, instance_id: str, **kwargs) -> float: """Calculate turn-level score for RL training""" async def finalize_interaction(self, instance_id: str, **kwargs) -> None: """Clean up resources""" .. code-block:: python from verl.interactions.base import BaseInteraction from typing import Dict, Any, List, Tuple, Optional class BaseInteraction: def __init__(self, config: Dict[str, Any]): self.config = config self.name: str = config.get("name", "interaction_agent") async def start_interaction(self, instance_id: Optional[str] = None, **kwargs) -> str: """Initialize interaction session, return instance_id""" async def generate_response(self, instance_id: str, messages: List[Dict[str, Any]], **kwargs) -> Tuple[bool, str, float, Dict[str, Any]]: """Generate response, return (should_terminate, response, score, metadata)""" async def calculate_score(self, instance_id: str, **kwargs) -> float: """Calculate turn-level score for RL training""" async def finalize_interaction(self, instance_id: str, **kwargs) -> None: """Clean up resources""" **Request Lifecycle** The interaction system integrates with SGLang's async rollout via state management: 1. ``PENDING`` → Initialize interaction via ``start_interaction()`` 2. ``GENERATING`` → Model generates response 3. ``INTERACTING`` → Process response via ``generate_response()`` 4. ``GENERATING`` → Continue if not terminated, otherwise ``COMPLETED`` 请求生命周期 交互系统通过状态管理与 SGLang 的异步 rollout 集成: 1. ``PENDING`` → 通过 ``start_interaction()`` 初始化交互 2. ``GENERATING`` → 模型生成响应 3. ``INTERACTING`` → 通过 ``generate_response()`` 处理响应 4. ``GENERATING`` → 如果未终止则继续,否则 ``COMPLETED`` Configuration ------------- **Basic Setup** Enable interaction in your rollout configuration: 基本设置 在你的 rollout 配置中启用交互: .. code-block:: yaml actor_rollout_ref: rollout: multi_turn: enable: true interaction_config_path: "path/to/interaction_config.yaml" max_user_turns: 10 max_assistant_turns: 10 .. code-block:: yaml actor_rollout_ref: rollout: multi_turn: enable: true interaction_config_path: "path/to/interaction_config.yaml" max_user_turns: 10 max_assistant_turns: 10 **Interaction Configuration File** Create an interaction configuration file (e.g., ``interaction_config.yaml``): 创建交互配置文件(例如:``interaction_config.yaml``): **Single Interaction (Legacy Format)** .. code-block:: yaml interaction: - name: "gsm8k" class_name: "verl.interactions.gsm8k_interaction.Gsm8kInteraction" config: {} **Multiple Interactions (New Format)** .. code-block:: yaml interaction: - name: "gsm8k" class_name: "verl.interactions.gsm8k_interaction.Gsm8kInteraction" config: {} - name: "custom_solver" class_name: "custom.interactions.CustomInteraction" config: solver_type: "advanced" timeout: 30 - name: "code_verifier" class_name: "verl.interactions.base.BaseInteraction" config: verification_mode: "strict" **Automatic Name Generation** If no ``name`` field is provided, the system will automatically generate one from the class name: .. code-block:: yaml interaction: - class_name: "verl.interactions.gsm8k_interaction.Gsm8kInteraction" config: {} # Automatically generates name: "gsm8k" The system will dynamically load all specified interaction classes and make them available by name. 自动名称生成 如果未提供 ``name`` 字段,系统将从类名自动生成一个: .. code-block:: yaml interaction: - class_name: "verl.interactions.gsm8k_interaction.Gsm8kInteraction" config: {} # Automatically generates name: "gsm8k" The system will dynamically load all specified interaction classes and make them available by name. 系统将动态加载所有指定的交互类,并按名称提供。 Implementation Example: GSM8K ----------------------------- The GSM8K interaction demonstrates a complete implementation for math problem-solving scenarios: GSM8K 实现示例 GSM8K 交互展示了针对数学问题解决场景的完整实现: .. code-block:: python from verl.interactions.base import BaseInteraction from verl.utils.reward_score import gsm8k from uuid import uuid4 class Gsm8kInteraction(BaseInteraction): def __init__(self, config: dict): super().__init__(config) self._instance_dict = {} async def start_interaction(self, instance_id=None, ground_truth=None, **kwargs): if instance_id is None: instance_id = str(uuid4()) self._instance_dict[instance_id] = { "response": "", "ground_truth": ground_truth, "reward": 0.0, } return instance_id async def generate_response(self, instance_id, messages, **kwargs): # Extract last assistant message content content = "" for item in reversed(messages): if item.get("role") == "assistant": content = item.get("content", "") break # Ensure GSM8K format (#### prefix) self._instance_dict[instance_id]["response"] = content reward = await self.calculate_score(instance_id) if reward == 1.0: return True, "Your response is correct!", 1.0, {} else: return False, "Your response is incorrect! You need to reflect on your answer and try again.", 0.0, {} async def calculate_score(self, instance_id, **kwargs): return gsm8k.compute_score( self._instance_dict[instance_id]["response"], self._instance_dict[instance_id]["ground_truth"], method="strict", format_score=0.0, score=1.0, ) async def finalize_interaction(self, instance_id, **kwargs): del self._instance_dict[instance_id] .. code-block:: python from verl.interactions.base import BaseInteraction from verl.utils.reward_score import gsm8k from uuid import uuid4 class Gsm8kInteraction(BaseInteraction): def __init__(self, config: dict): super().__init__(config) self._instance_dict = {} async def start_interaction(self, instance_id=None, ground_truth=None, **kwargs): if instance_id is None: instance_id = str(uuid4()) self._instance_dict[instance_id] = { "response": "", "ground_truth": ground_truth, "reward": 0.0, } return instance_id async def generate_response(self, instance_id, messages, **kwargs): # Extract last assistant message content content = "" for item in reversed(messages): if item.get("role") == "assistant": content = item.get("content", "") break # Ensure GSM8K format (#### prefix) self._instance_dict[instance_id]["response"] = content reward = await self.calculate_score(instance_id) if reward == 1.0: return True, "Your response is correct!", 1.0, {} else: return False, "Your response is incorrect! You need to reflect on your answer and try again.", 0.0, {} async def calculate_score(self, instance_id, **kwargs): return gsm8k.compute_score( self._instance_dict[instance_id]["response"], self._instance_dict[instance_id]["ground_truth"], method="strict", format_score=0.0, score=1.0, ) async def finalize_interaction(self, instance_id, **kwargs): del self._instance_dict[instance_id] Training Integration -------------------- **Training Script Configuration** Include interaction configuration in your training command: 训练脚本配置 在训练命令中包含交互配置: .. code-block:: bash python3 -m verl.trainer.main_ppo \\ --config-path="$CONFIG_PATH" \\ --config-name='gsm8k_multiturn_grpo_w_interaction' \\ algorithm.adv_estimator=grpo \\ data.train_batch_size=512 \\ data.return_raw_chat=True \\ actor_rollout_ref.rollout.name=sglang \\ actor_rollout_ref.rollout.multi_turn.interaction_config_path="$PROJECT_DIR/examples/sglang_multiturn/config/interaction_config/gsm8k_interaction_config.yaml" \\ trainer.total_epochs=15 .. code-block:: bash python3 -m verl.trainer.main_ppo \\ --config-path="$CONFIG_PATH" \\ --config-name='gsm8k_multiturn_grpo_w_interaction' \\ algorithm.adv_estimator=grpo \\ data.train_batch_size=512 \\ data.return_raw_chat=True \\ actor_rollout_ref.rollout.name=sglang \\ actor_rollout_ref.rollout.multi_turn.interaction_config_path="$PROJECT_DIR/examples/sglang_multiturn/config/interaction_config/gsm8k_interaction_config.yaml" \\ trainer.total_epochs=15 **Data Requirements** Ensure your dataset includes interaction parameters with the ``name`` field for interaction selection: 数据需求 确保你的数据集包含 ``name`` 字段的交互参数,用于交互选择: .. code-block:: python # Dataset should include interaction_kwargs in non_tensor_batch interaction_kwargs = [ {"name": "gsm8k", "query": "What is 2+2?", "ground_truth": "4"}, {"name": "custom_solver", "query": "Solve: x^2 + 5x + 6 = 0", "ground_truth": "x = -2, -3"}, {"name": "gsm8k", "query": "What is 3+3?", "ground_truth": "6"}, ] .. code-block:: python # Dataset should include interaction_kwargs in non_tensor_batch interaction_kwargs = [ {"name": "gsm8k", "query": "What is 2+2?", "ground_truth": "4"}, {"name": "custom_solver", "query": "Solve: x^2 + 5x + 6 = 0", "ground_truth": "x = -2, -3"}, {"name": "gsm8k", "query": "What is 3+3?", "ground_truth": "6"}, ] **Sample-Level Interaction Selection** Each sample can specify which interaction to use via the ``name`` field. This enables flexible training scenarios where different samples use different interaction strategies: 样本级别交互选择 每个样本可以通过 ``name`` 字段指定使用哪种交互。这允许不同样本采用不同的交互策略,从而实现灵活的训练场景: .. code-block:: python # Example: Math problems use GSM8K interaction, code problems use code verifier data_samples = [ { "prompt": "What is 15% of 200?", "interaction_kwargs": { "name": "gsm8k", "query": "What is 15% of 200?", "ground_truth": "30" } }, { "prompt": "Write a function to check if a number is prime", "interaction_kwargs": { "name": "code_verifier", "code_type": "python", "expected_behavior": "return True for prime numbers" } } ] .. code-block:: python # Example: Math problems use GSM8K interaction, code problems use code verifier data_samples = [ { "prompt": "What is 15% of 200?", "interaction_kwargs": { "name": "gsm8k", "query": "What is 15% of 200?", "ground_truth": "30" } }, { "prompt": "Write a function to check if a number is prime", "interaction_kwargs": { "name": "code_verifier", "code_type": "python", "expected_behavior": "return True for prime numbers" } } ] **Backward Compatibility** If no ``name`` field is provided in ``interaction_kwargs``, the system defaults to ``"gsm8k"`` for backward compatibility. 向后兼容性 如果 ``interaction_kwargs`` 中未提供 ``name`` 字段,系统默认使用 ``"gsm8k"`` 以确保向后兼容。 Best Practices -------------- **Resource Management** - Always implement proper cleanup in ``finalize_interaction()`` - Use unique instance IDs to avoid conflicts in concurrent training - Handle edge cases like empty messages or malformed content 资源管理 - 始终在 ``finalize_interaction()`` 中实现适当的清理 - 使用唯一实例 ID 以避免并发训练中的冲突 - 处理边缘情况,如空消息或格式错误的內容 **Performance Optimization** - Keep interaction logic lightweight to avoid blocking training - Use async/await properly to maintain non-blocking behavior - Consider caching expensive computations within interaction instances 性能优化 - 保持交互逻辑轻量,以避免阻塞训练 - 正确使用 async/await 以维持非阻塞行为 - 考虑在交互实例中缓存昂贵的计算 **Testing** Comprehensive testing is essential for interaction systems: .. code-block:: python import pytest from unittest.mock import patch @pytest.mark.asyncio async def test_interaction_workflow(): interaction = YourInteraction({}) # Test complete workflow instance_id = await interaction.start_interaction(ground_truth="expected_answer") messages = [{"role": "user", "content": "user_content"}, {"role": "assistant", "content": "assistant_content"}] should_terminate, response, reward, metadata = await interaction.generate_response(instance_id, messages) assert should_terminate in [True, False] assert isinstance(reward, float) await interaction.finalize_interaction(instance_id) 对交互系统来说,全面测试至关重要: .. code-block:: python import pytest from unittest.mock import patch @pytest.mark.asyncio async def test_interaction_workflow(): interaction = YourInteraction({}) # Test complete workflow instance_id = await interaction.start_interaction(ground_truth="expected_answer") messages = [{"role": "user", "content": "user_content"}, {"role": "assistant", "content": "assistant_content"}] should_terminate, response, reward, metadata = await interaction.generate_response(instance_id, messages) assert should_terminate in [True, False] assert isinstance(reward, float) await interaction.finalize_interaction(instance_id) Advanced Usage -------------- **Multi-Interaction Training Strategies** You can design sophisticated training scenarios using multiple interactions: 多交互训练策略 您可以使用多个交互设计复杂的训练场景: .. code-block:: python # Example: Progressive difficulty with different interaction agents class MathTrainingPipeline: def create_interaction_config(self): return { "interaction": [ { "name": "basic_math", "class_name": "verl.interactions.gsm8k_interaction.Gsm8kInteraction", "config": {"difficulty": "easy"} }, { "name": "advanced_math", "class_name": "custom.interactions.AdvancedMathInteraction", "config": {"difficulty": "hard", "allow_hints": True} }, { "name": "competition_math", "class_name": "custom.interactions.CompetitionMathInteraction", "config": {"time_limit": 300, "show_steps": False} } ] } def create_curriculum_data(self, epoch): if epoch < 5: return [{"name": "basic_math", ...} for _ in samples] elif epoch < 10: return [{"name": "advanced_math", ...} for _ in samples] else: return [{"name": "competition_math", ...} for _ in samples] .. code-block:: python # Example: Progressive difficulty with different interaction agents class MathTrainingPipeline: def create_interaction_config(self): return { "interaction": [ { "name": "basic_math", "class_name": "verl.interactions.gsm8k_interaction.Gsm8kInteraction", "config": {"difficulty": "easy"} }, { "name": "advanced_math", "class_name": "custom.interactions.AdvancedMathInteraction", "config": {"difficulty": "hard", "allow_hints": True} }, { "name": "competition_math", "class_name": "custom.interactions.CompetitionMathInteraction", "config": {"time_limit": 300, "show_steps": False} } ] } def create_curriculum_data(self, epoch): if epoch < 5: return [{"name": "basic_math", ...} for _ in samples] elif epoch < 10: return [{"name": "advanced_math", ...} for _ in samples] else: return [{"name": "competition_math", ...} for _ in samples] **Custom Scoring Functions** You can integrate custom reward functions: 自定义评分函数 您可以集成自定义奖励函数: .. code-block:: python async def calculate_score(self, instance_id, **kwargs): response = self._instance_dict[instance_id]["response"] ground_truth = self._instance_dict[instance_id]["ground_truth"] # Custom evaluation logic if custom_evaluation_function(response, ground_truth): return 1.0 else: return 0.0 .. code-block:: python async def calculate_score(self, instance_id, **kwargs): response = self._instance_dict[instance_id]["response"] ground_truth = self._instance_dict[instance_id]["ground_truth"] # Custom evaluation logic if custom_evaluation_function(response, ground_truth): return 1.0 else: return 0.0 **Multi-step Interactions** For complex scenarios requiring multiple feedback rounds: 多步骤交互 针对需要多个反馈轮次的复杂场景: .. code-block:: python async def generate_response(self, instance_id, messages, **kwargs): instance = self._instance_dict[instance_id] instance["attempts"] += 1 # Evaluate current response reward = await self.calculate_score(instance_id) if reward > 0.8: return True, "Excellent work!", reward, {} elif instance["attempts"] < 3: return False, "Good attempt, but try to improve...", reward, {} else: return True, "Maximum attempts reached.", reward, {} .. code-block:: python async def generate_response(self, instance_id, messages, **kwargs): instance = self._instance_dict[instance_id] instance["attempts"] += 1 # Evaluate current response reward = await self.calculate_score(instance_id) if reward > 0.8: return True, "Excellent work!", reward, {} elif instance["attempts"] < 3: return False, "Good attempt, but try to improve...", reward, {} else: return True, "Maximum attempts reached.", reward, {} Troubleshooting --------------- **Common Issues** 1. **Instance ID Conflicts**: Ensure unique instance IDs across concurrent sessions 2. **Memory Leaks**: Always call ``finalize_interaction()`` to clean up resources 3. **Blocking Operations**: Keep interaction logic async and non-blocking 4. **Configuration Errors**: Verify interaction config path and class name are correct 5. **Interaction Name Conflicts**: Ensure all interactions have unique names in the configuration 6. **Missing Interaction**: Verify the ``name`` field in ``interaction_kwargs`` matches available interactions 7. **Backward Compatibility**: When migrating from single to multi-interaction, add ``name`` fields to existing data 常见问题 1. **实例 ID 冲突**:确保在并发会话中实例 ID 唯一 2. **内存泄漏**:始终调用 ``finalize_interaction()`` 以清理资源 3. **阻塞操作**:保持交互逻辑异步且非阻塞 4. **配置错误**:验证交互配置路径和类名正确 5. **交互名称冲突**:确保配置中所有交互具有唯一名称 6. **缺少交互**:验证 ``interaction_kwargs`` 中的 ``name`` 字段与可用交互匹配 7. **向后兼容性**:从单交互迁移到多交互时,为现有数据添加 ``name`` 字段 **Debugging** Enable debug logging to trace interaction flow: 调试 启用调试日志以跟踪交互流程: .. code-block:: bash export VERL_LOGGING_LEVEL=DEBUG .. code-block:: bash export VERL_LOGGING_LEVEL=DEBUG **Performance Monitoring** Monitor interaction performance impact on training throughput and adjust accordingly. 性能监控 监控交互对训练吞吐量的性能影响,并相应调整。 Related Documentation -------------------- - :doc:`multiturn`: Basic multi-turn rollout configuration - :doc:`sandbox_fusion`: Tool integration with SGLang - :doc:`search_tool_example`: Search tool implementation example 相关文档 - :doc:`multiturn`: 基本多轮 rollout 配置 - :doc:`sandbox_fusion`: 与 SGLang 的工具集成 - :doc:`search_tool_example`: 搜索工具实现示例