Interaction System for Multi-turn RL Training

Last updated: 06/25/2025.

Overview

The verl interaction system enables dynamic, multi-turn conversational feedback during reinforcement learning training. This system allows models to engage in iterative problem-solving scenarios where interaction agents can provide corrective feedback, guidance, or evaluation based on the model’s responses.

verl 交互系统支持在强化学习训练过程中实现动态、多轮对话反馈。该系统允许模型参与迭代式问题解决场景,其中交互代理可以根据模型的响应提供纠正性反馈、指导或评估。

New in Multi-Interaction Support: The system now supports multiple named interactions within a single training session, enabling sophisticated training scenarios where different samples can use different interaction strategies. This allows for curriculum learning, domain-specific feedback, and flexible agent switching at the sample level.

多交互支持中的新特性:系统现在支持在单个训练会话中进行多个命名交互,这使得不同样本可以使用不同的交互策略,从而实现复杂的训练场景,包括课程学习(curriculum learning)、特定领域反馈以及样本级别的灵活代理切换。

Key features:

  • Async-based Architecture: Non-blocking interaction processing for distributed training

  • Instance Management: Stateful session handling with unique instance IDs for concurrent interactions

  • SGLang Integration: Seamless integration with SGLang rollout system for multi-turn conversations

  • Configuration-driven: Dynamic agent loading via YAML configuration files

  • Multi-Interaction Support: Registry system enabling multiple named interactions per rollout

  • Sample-Level Selection: Each sample can specify which interaction to use via configuration

  • Reward Integration: Turn-level scoring mechanism integrated with verl’s reward system

关键特性:

  • 基于异步的架构:为分布式训练提供非阻塞式交互处理

  • 实例管理:使用唯一实例 ID 处理有状态会话,支持并发交互

  • SGLang 集成:与 SGLang rollout 系统无缝集成,支持多轮对话

  • 配置驱动:通过 YAML 配置文件动态加载代理

  • 多交互支持:注册表系统支持单次 rollout 中的多个命名交互

  • 样本级别选择:每个样本可通过配置指定使用哪种交互

  • 奖励集成:轮级别评分机制,与 verl 的奖励系统集成

Architecture

The interaction system follows a plugin-based architecture with clear separation of concerns:

Interaction Registry System
     ↓
BaseInteraction (Abstract Interface)
     ↓
Multiple Named Interactions (e.g., Gsm8kInteraction, CustomInteraction)
     ↓
SGLang Rollout Integration (interaction_map)
     ↓
Sample-Level Interaction Selection
     ↓
Async Request Lifecycle Management

交互系统采用基于插件的架构,职责分离清晰:

Interaction Registry System
     ↓
BaseInteraction (Abstract Interface)
     ↓
Multiple Named Interactions (e.g., Gsm8kInteraction, CustomInteraction)
     ↓
SGLang Rollout Integration (interaction_map)
     ↓
Sample-Level Interaction Selection
     ↓
Async Request Lifecycle Management

Core Components

Interaction Registry System

The interaction registry system allows loading and managing multiple named interactions:

交互注册表系统允许加载和管理多个命名交互:

from verl.interactions.utils.interaction_registry import initialize_interactions_from_config

# Load multiple interactions from config
interaction_map = initialize_interactions_from_config("config.yaml")

# Access specific interaction by name
gsm8k_interaction = interaction_map["gsm8k"]
custom_interaction = interaction_map["custom_solver"]
from verl.interactions.utils.interaction_registry import initialize_interactions_from_config

# Load multiple interactions from config
interaction_map = initialize_interactions_from_config("config.yaml")

# Access specific interaction by name
gsm8k_interaction = interaction_map["gsm8k"]
custom_interaction = interaction_map["custom_solver"]

BaseInteraction Interface

All interaction agents must implement the BaseInteraction abstract class:

所有交互代理必须实现 BaseInteraction 抽象类:

from verl.interactions.base import BaseInteraction
from typing import Dict, Any, List, Tuple, Optional

class BaseInteraction:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.name: str = config.get("name", "interaction_agent")

    async def start_interaction(self, instance_id: Optional[str] = None, **kwargs) -> str:
        """Initialize interaction session, return instance_id"""

    async def generate_response(self, instance_id: str, messages: List[Dict[str, Any]], **kwargs) -> Tuple[bool, str, float, Dict[str, Any]]:
        """Generate response, return (should_terminate, response, score, metadata)"""

    async def calculate_score(self, instance_id: str, **kwargs) -> float:
        """Calculate turn-level score for RL training"""

    async def finalize_interaction(self, instance_id: str, **kwargs) -> None:
        """Clean up resources"""
from verl.interactions.base import BaseInteraction
from typing import Dict, Any, List, Tuple, Optional

class BaseInteraction:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.name: str = config.get("name", "interaction_agent")

    async def start_interaction(self, instance_id: Optional[str] = None, **kwargs) -> str:
        """Initialize interaction session, return instance_id"""

    async def generate_response(self, instance_id: str, messages: List[Dict[str, Any]], **kwargs) -> Tuple[bool, str, float, Dict[str, Any]]:
        """Generate response, return (should_terminate, response, score, metadata)"""

    async def calculate_score(self, instance_id: str, **kwargs) -> float:
        """Calculate turn-level score for RL training"""

    async def finalize_interaction(self, instance_id: str, **kwargs) -> None:
        """Clean up resources"""

Request Lifecycle

The interaction system integrates with SGLang’s async rollout via state management:

  1. PENDING → Initialize interaction via start_interaction()

  2. GENERATING → Model generates response

  3. INTERACTING → Process response via generate_response()

  4. GENERATING → Continue if not terminated, otherwise COMPLETED

请求生命周期

交互系统通过状态管理与 SGLang 的异步 rollout 集成:

  1. PENDING → 通过 start_interaction() 初始化交互

  2. GENERATING → 模型生成响应

  3. INTERACTING → 通过 generate_response() 处理响应

  4. GENERATING → 如果未终止则继续,否则 COMPLETED

Configuration

Basic Setup

Enable interaction in your rollout configuration:

基本设置

在你的 rollout 配置中启用交互:

actor_rollout_ref:
    rollout:
        multi_turn:
            enable: true
            interaction_config_path: "path/to/interaction_config.yaml"
            max_user_turns: 10
            max_assistant_turns: 10
actor_rollout_ref:
    rollout:
        multi_turn:
            enable: true
            interaction_config_path: "path/to/interaction_config.yaml"
            max_user_turns: 10
            max_assistant_turns: 10

Interaction Configuration File

Create an interaction configuration file (e.g., interaction_config.yaml):

创建交互配置文件(例如:interaction_config.yaml):

Single Interaction (Legacy Format)

interaction:
  - name: "gsm8k"
    class_name: "verl.interactions.gsm8k_interaction.Gsm8kInteraction"
    config: {}

Multiple Interactions (New Format)

interaction:
  - name: "gsm8k"
    class_name: "verl.interactions.gsm8k_interaction.Gsm8kInteraction"
    config: {}
  - name: "custom_solver"
    class_name: "custom.interactions.CustomInteraction"
    config:
      solver_type: "advanced"
      timeout: 30
  - name: "code_verifier"
    class_name: "verl.interactions.base.BaseInteraction"
    config:
      verification_mode: "strict"

Automatic Name Generation

If no name field is provided, the system will automatically generate one from the class name:

interaction:
  - class_name: "verl.interactions.gsm8k_interaction.Gsm8kInteraction"
    config: {}
    # Automatically generates name: "gsm8k"

The system will dynamically load all specified interaction classes and make them available by name.

自动名称生成

如果未提供 name 字段,系统将从类名自动生成一个:

interaction:
  - class_name: "verl.interactions.gsm8k_interaction.Gsm8kInteraction"
    config: {}
    # Automatically generates name: "gsm8k"

The system will dynamically load all specified interaction classes and make them available by name.

系统将动态加载所有指定的交互类,并按名称提供。

Implementation Example: GSM8K

The GSM8K interaction demonstrates a complete implementation for math problem-solving scenarios:

GSM8K 实现示例

GSM8K 交互展示了针对数学问题解决场景的完整实现:

from verl.interactions.base import BaseInteraction
from verl.utils.reward_score import gsm8k
from uuid import uuid4

class Gsm8kInteraction(BaseInteraction):
    def __init__(self, config: dict):
        super().__init__(config)
        self._instance_dict = {}

    async def start_interaction(self, instance_id=None, ground_truth=None, **kwargs):
        if instance_id is None:
            instance_id = str(uuid4())
        self._instance_dict[instance_id] = {
            "response": "",
            "ground_truth": ground_truth,
            "reward": 0.0,
        }
        return instance_id

    async def generate_response(self, instance_id, messages, **kwargs):
        # Extract last assistant message content
        content = ""
        for item in reversed(messages):
            if item.get("role") == "assistant":
                content = item.get("content", "")
                break

        # Ensure GSM8K format (#### prefix)
        self._instance_dict[instance_id]["response"] = content

        reward = await self.calculate_score(instance_id)
        if reward == 1.0:
            return True, "Your response is correct!", 1.0, {}
        else:
            return False, "Your response is incorrect! You need to reflect on your answer and try again.", 0.0, {}

    async def calculate_score(self, instance_id, **kwargs):
        return gsm8k.compute_score(
            self._instance_dict[instance_id]["response"],
            self._instance_dict[instance_id]["ground_truth"],
            method="strict", format_score=0.0, score=1.0,
        )

    async def finalize_interaction(self, instance_id, **kwargs):
        del self._instance_dict[instance_id]
from verl.interactions.base import BaseInteraction
from verl.utils.reward_score import gsm8k
from uuid import uuid4

class Gsm8kInteraction(BaseInteraction):
    def __init__(self, config: dict):
        super().__init__(config)
        self._instance_dict = {}

    async def start_interaction(self, instance_id=None, ground_truth=None, **kwargs):
        if instance_id is None:
            instance_id = str(uuid4())
        self._instance_dict[instance_id] = {
            "response": "",
            "ground_truth": ground_truth,
            "reward": 0.0,
        }
        return instance_id

    async def generate_response(self, instance_id, messages, **kwargs):
        # Extract last assistant message content
        content = ""
        for item in reversed(messages):
            if item.get("role") == "assistant":
                content = item.get("content", "")
                break

        # Ensure GSM8K format (#### prefix)
        self._instance_dict[instance_id]["response"] = content

        reward = await self.calculate_score(instance_id)
        if reward == 1.0:
            return True, "Your response is correct!", 1.0, {}
        else:
            return False, "Your response is incorrect! You need to reflect on your answer and try again.", 0.0, {}

    async def calculate_score(self, instance_id, **kwargs):
        return gsm8k.compute_score(
            self._instance_dict[instance_id]["response"],
            self._instance_dict[instance_id]["ground_truth"],
            method="strict", format_score=0.0, score=1.0,
        )

    async def finalize_interaction(self, instance_id, **kwargs):
        del self._instance_dict[instance_id]

Training Integration

Training Script Configuration

Include interaction configuration in your training command:

训练脚本配置

在训练命令中包含交互配置:

python3 -m verl.trainer.main_ppo \\
    --config-path="$CONFIG_PATH" \\
    --config-name='gsm8k_multiturn_grpo_w_interaction' \\
    algorithm.adv_estimator=grpo \\
    data.train_batch_size=512 \\
    data.return_raw_chat=True \\
    actor_rollout_ref.rollout.name=sglang \\
    actor_rollout_ref.rollout.multi_turn.interaction_config_path="$PROJECT_DIR/examples/sglang_multiturn/config/interaction_config/gsm8k_interaction_config.yaml" \\
    trainer.total_epochs=15
python3 -m verl.trainer.main_ppo \\
    --config-path="$CONFIG_PATH" \\
    --config-name='gsm8k_multiturn_grpo_w_interaction' \\
    algorithm.adv_estimator=grpo \\
    data.train_batch_size=512 \\
    data.return_raw_chat=True \\
    actor_rollout_ref.rollout.name=sglang \\
    actor_rollout_ref.rollout.multi_turn.interaction_config_path="$PROJECT_DIR/examples/sglang_multiturn/config/interaction_config/gsm8k_interaction_config.yaml" \\
    trainer.total_epochs=15

Data Requirements

Ensure your dataset includes interaction parameters with the name field for interaction selection:

数据需求

确保你的数据集包含 name 字段的交互参数,用于交互选择:

# Dataset should include interaction_kwargs in non_tensor_batch
interaction_kwargs = [
    {"name": "gsm8k", "query": "What is 2+2?", "ground_truth": "4"},
    {"name": "custom_solver", "query": "Solve: x^2 + 5x + 6 = 0", "ground_truth": "x = -2, -3"},
    {"name": "gsm8k", "query": "What is 3+3?", "ground_truth": "6"},
]
# Dataset should include interaction_kwargs in non_tensor_batch
interaction_kwargs = [
    {"name": "gsm8k", "query": "What is 2+2?", "ground_truth": "4"},
    {"name": "custom_solver", "query": "Solve: x^2 + 5x + 6 = 0", "ground_truth": "x = -2, -3"},
    {"name": "gsm8k", "query": "What is 3+3?", "ground_truth": "6"},
]

Sample-Level Interaction Selection

Each sample can specify which interaction to use via the name field. This enables flexible training scenarios where different samples use different interaction strategies:

样本级别交互选择

每个样本可以通过 name 字段指定使用哪种交互。这允许不同样本采用不同的交互策略,从而实现灵活的训练场景:

# Example: Math problems use GSM8K interaction, code problems use code verifier
data_samples = [
    {
        "prompt": "What is 15% of 200?",
        "interaction_kwargs": {
            "name": "gsm8k",
            "query": "What is 15% of 200?",
            "ground_truth": "30"
        }
    },
    {
        "prompt": "Write a function to check if a number is prime",
        "interaction_kwargs": {
            "name": "code_verifier",
            "code_type": "python",
            "expected_behavior": "return True for prime numbers"
        }
    }
]
# Example: Math problems use GSM8K interaction, code problems use code verifier
data_samples = [
    {
        "prompt": "What is 15% of 200?",
        "interaction_kwargs": {
            "name": "gsm8k",
            "query": "What is 15% of 200?",
            "ground_truth": "30"
        }
    },
    {
        "prompt": "Write a function to check if a number is prime",
        "interaction_kwargs": {
            "name": "code_verifier",
            "code_type": "python",
            "expected_behavior": "return True for prime numbers"
        }
    }
]

Backward Compatibility

If no name field is provided in interaction_kwargs, the system defaults to "gsm8k" for backward compatibility.

向后兼容性

如果 interaction_kwargs 中未提供 name 字段,系统默认使用 "gsm8k" 以确保向后兼容。

Best Practices

Resource Management

  • Always implement proper cleanup in finalize_interaction()

  • Use unique instance IDs to avoid conflicts in concurrent training

  • Handle edge cases like empty messages or malformed content

资源管理

  • 始终在 finalize_interaction() 中实现适当的清理

  • 使用唯一实例 ID 以避免并发训练中的冲突

  • 处理边缘情况,如空消息或格式错误的內容

Performance Optimization

  • Keep interaction logic lightweight to avoid blocking training

  • Use async/await properly to maintain non-blocking behavior

  • Consider caching expensive computations within interaction instances

性能优化

  • 保持交互逻辑轻量,以避免阻塞训练

  • 正确使用 async/await 以维持非阻塞行为

  • 考虑在交互实例中缓存昂贵的计算

Testing

Comprehensive testing is essential for interaction systems:

import pytest
from unittest.mock import patch

@pytest.mark.asyncio
async def test_interaction_workflow():
    interaction = YourInteraction({})

    # Test complete workflow
    instance_id = await interaction.start_interaction(ground_truth="expected_answer")


    messages = [{"role": "user", "content": "user_content"}, {"role": "assistant", "content": "assistant_content"}]
    should_terminate, response, reward, metadata = await interaction.generate_response(instance_id, messages)

    assert should_terminate in [True, False]
    assert isinstance(reward, float)

    await interaction.finalize_interaction(instance_id)

对交互系统来说,全面测试至关重要:

import pytest
from unittest.mock import patch

@pytest.mark.asyncio
async def test_interaction_workflow():
    interaction = YourInteraction({})

    # Test complete workflow
    instance_id = await interaction.start_interaction(ground_truth="expected_answer")


    messages = [{"role": "user", "content": "user_content"}, {"role": "assistant", "content": "assistant_content"}]
    should_terminate, response, reward, metadata = await interaction.generate_response(instance_id, messages)

    assert should_terminate in [True, False]
    assert isinstance(reward, float)

    await interaction.finalize_interaction(instance_id)

Advanced Usage

Multi-Interaction Training Strategies

You can design sophisticated training scenarios using multiple interactions:

多交互训练策略

您可以使用多个交互设计复杂的训练场景:

# Example: Progressive difficulty with different interaction agents
class MathTrainingPipeline:
    def create_interaction_config(self):
        return {
            "interaction": [
                {
                    "name": "basic_math",
                    "class_name": "verl.interactions.gsm8k_interaction.Gsm8kInteraction",
                    "config": {"difficulty": "easy"}
                },
                {
                    "name": "advanced_math",
                    "class_name": "custom.interactions.AdvancedMathInteraction",
                    "config": {"difficulty": "hard", "allow_hints": True}
                },
                {
                    "name": "competition_math",
                    "class_name": "custom.interactions.CompetitionMathInteraction",
                    "config": {"time_limit": 300, "show_steps": False}
                }
            ]
        }

    def create_curriculum_data(self, epoch):
        if epoch < 5:
            return [{"name": "basic_math", ...} for _ in samples]
        elif epoch < 10:
            return [{"name": "advanced_math", ...} for _ in samples]
        else:
            return [{"name": "competition_math", ...} for _ in samples]
# Example: Progressive difficulty with different interaction agents
class MathTrainingPipeline:
    def create_interaction_config(self):
        return {
            "interaction": [
                {
                    "name": "basic_math",
                    "class_name": "verl.interactions.gsm8k_interaction.Gsm8kInteraction",
                    "config": {"difficulty": "easy"}
                },
                {
                    "name": "advanced_math",
                    "class_name": "custom.interactions.AdvancedMathInteraction",
                    "config": {"difficulty": "hard", "allow_hints": True}
                },
                {
                    "name": "competition_math",
                    "class_name": "custom.interactions.CompetitionMathInteraction",
                    "config": {"time_limit": 300, "show_steps": False}
                }
            ]
        }

    def create_curriculum_data(self, epoch):
        if epoch < 5:
            return [{"name": "basic_math", ...} for _ in samples]
        elif epoch < 10:
            return [{"name": "advanced_math", ...} for _ in samples]
        else:
            return [{"name": "competition_math", ...} for _ in samples]

Custom Scoring Functions

You can integrate custom reward functions:

自定义评分函数

您可以集成自定义奖励函数:

async def calculate_score(self, instance_id, **kwargs):
    response = self._instance_dict[instance_id]["response"]
    ground_truth = self._instance_dict[instance_id]["ground_truth"]

    # Custom evaluation logic
    if custom_evaluation_function(response, ground_truth):
        return 1.0
    else:
        return 0.0
async def calculate_score(self, instance_id, **kwargs):
    response = self._instance_dict[instance_id]["response"]
    ground_truth = self._instance_dict[instance_id]["ground_truth"]

    # Custom evaluation logic
    if custom_evaluation_function(response, ground_truth):
        return 1.0
    else:
        return 0.0

Multi-step Interactions

For complex scenarios requiring multiple feedback rounds:

多步骤交互

针对需要多个反馈轮次的复杂场景:

async def generate_response(self, instance_id, messages, **kwargs):
    instance = self._instance_dict[instance_id]
    instance["attempts"] += 1

    # Evaluate current response
    reward = await self.calculate_score(instance_id)

    if reward > 0.8:
        return True, "Excellent work!", reward, {}
    elif instance["attempts"] < 3:
        return False, "Good attempt, but try to improve...", reward, {}
    else:
        return True, "Maximum attempts reached.", reward, {}
async def generate_response(self, instance_id, messages, **kwargs):
    instance = self._instance_dict[instance_id]
    instance["attempts"] += 1

    # Evaluate current response
    reward = await self.calculate_score(instance_id)

    if reward > 0.8:
        return True, "Excellent work!", reward, {}
    elif instance["attempts"] < 3:
        return False, "Good attempt, but try to improve...", reward, {}
    else:
        return True, "Maximum attempts reached.", reward, {}

Troubleshooting

Common Issues

  1. Instance ID Conflicts: Ensure unique instance IDs across concurrent sessions

  2. Memory Leaks: Always call finalize_interaction() to clean up resources

  3. Blocking Operations: Keep interaction logic async and non-blocking

  4. Configuration Errors: Verify interaction config path and class name are correct

  5. Interaction Name Conflicts: Ensure all interactions have unique names in the configuration

  6. Missing Interaction: Verify the name field in interaction_kwargs matches available interactions

  7. Backward Compatibility: When migrating from single to multi-interaction, add name fields to existing data

常见问题

  1. 实例 ID 冲突:确保在并发会话中实例 ID 唯一

  2. 内存泄漏:始终调用 finalize_interaction() 以清理资源

  3. 阻塞操作:保持交互逻辑异步且非阻塞

  4. 配置错误:验证交互配置路径和类名正确

  5. 交互名称冲突:确保配置中所有交互具有唯一名称

  6. 缺少交互:验证 interaction_kwargs 中的 name 字段与可用交互匹配

  7. 向后兼容性:从单交互迁移到多交互时,为现有数据添加 name 字段

Debugging

Enable debug logging to trace interaction flow:

调试

启用调试日志以跟踪交互流程:

export VERL_LOGGING_LEVEL=DEBUG
export VERL_LOGGING_LEVEL=DEBUG

Performance Monitoring

Monitor interaction performance impact on training throughput and adjust accordingly.

性能监控

监控交互对训练吞吐量的性能影响,并相应调整。