需要英文版本?请查看 Architecture Guide

目录

  1. 🏗️ 架构指南 (v2.0)
    1. 📋 目录
    2. 🎯 核心设计原则
      1. 1. 生产就绪
      2. 2. 模块化和可维护性
      3. 3. 可扩展性和灵活性
      4. 4. 可观测性和监控
      5. 5. 可靠性和韧性
    3. 🌐 系统概述
    4. 🔧 组件架构
      1. 智能体控制器(增强版v2.0)
      2. 智能体层次结构(重构版v2.0)
    5. 📊 Token跟踪系统
      1. 架构概述
      2. Token使用流程
      3. 工具系统架构(增强版)
    6. 🛡️ 错误处理和恢复
      1. 多层错误管理
      2. 错误类别和策略
    7. 📈 性能监控
      1. 实时指标收集
      2. 性能分析
    8. ⚙️ 增强配置系统
      1. 分层配置管理
      2. 配置模式
    9. 🔌 扩展点
      1. 插件架构
      2. 自定义智能体开发
    10. 🎯 消息流程和数据结构
      1. 增强消息格式

🏗️ 架构指南 (v2.0)

本文档提供了 Sage 多智能体框架增强架构、设计原则和生产就绪功能的内部工作流程的全面概述。

📋 目录

🎯 核心设计原则

1. 生产就绪

  • 企业级错误处理和恢复
  • 全面监控和可观测性
  • 性能优化和资源管理
  • 成本跟踪和使用分析

2. 模块化和可维护性

  • 每个智能体都有单一、明确定义的职责
  • 清晰的接口和依赖注入
  • 组件和插件的热重载
  • 全面的单元测试和集成测试

3. 可扩展性和灵活性

  • 基于插件的工具和智能体架构
  • 可配置的执行流水线
  • 支持多个LLM提供商和API格式
  • 运行时配置更新

4. 可观测性和监控

  • 实时token使用跟踪和成本监控
  • 结构化输出的全面日志
  • 性能指标和瓶颈检测
  • 流式可视化和进度跟踪

5. 可靠性和韧性

  • 自动恢复的优雅错误处理
  • 指数退避重试机制
  • 外部服务的熔断器模式
  • 内存管理和资源清理

🌐 系统概述

graph TB
    subgraph "🎮 用户界面层"
        UI[网页界面<br/>📊 实时监控]
        CLI[命令行<br/>⚡ 高性能]
        API[Python API<br/>🔧 完全控制]
    end
    
    subgraph "🧠 控制层"
        AC[智能体控制器<br/>📈 增强协调]
        TT[Token跟踪器<br/>💰 成本监控]
        PM[性能监控器<br/>⏱️ 指标]
        EM[错误管理器<br/>🛡️ 恢复]
    end
    
    subgraph "🤖 智能体层 (v2.0)"
        TA[任务分析智能体<br/>🎯 上下文感知]
        PA[规划智能体<br/>🧩 依赖管理]
        EA[执行智能体<br/>🔧 工具集成]
        OA[观察智能体<br/>👁️ 进度跟踪]
        SA[总结智能体<br/>📄 结构化输出]
        DA[直接执行智能体<br/>⚡ 快速模式]
    end
    
    subgraph "🛠️ 增强工具层"
        TM[工具管理器<br/>🔍 自动发现]
        BT[内置工具<br/>📱 核心功能]
        MCP[MCP服务器<br/>🌐 外部API]
        CT[自定义工具<br/>🎨 用户定义]
        TO[工具编排器<br/>⚙️ 负载均衡]
    end
    
    subgraph "⚙️ 基础设施层"
        CFG[配置<br/>📋 热重载]
        LOG[日志<br/>📝 结构化]
        EXC[异常处理<br/>🔄 自动恢复]
        LLM[LLM提供商<br/>🤖 多API]
        CACHE[缓存层<br/>💾 性能]
    end
    
    UI --> AC
    CLI --> AC
    API --> AC
    
    AC <--> TT
    AC <--> PM
    AC <--> EM
    
    AC --> TA
    AC --> PA
    AC --> EA
    AC --> OA
    AC --> SA
    AC --> DA
    
    EA --> TM
    TM --> TO
    TO --> BT
    TO --> MCP
    TO --> CT
    
    AC --> CFG
    AC --> LOG
    AC --> EXC
    AC --> LLM
    AC --> CACHE
    
    TT -.-> TA
    TT -.-> PA
    TT -.-> EA
    TT -.-> OA
    TT -.-> SA
    
    style AC fill:#ff9999
    style TT fill:#ffcc99
    style TM fill:#99ccff
    style EM fill:#ff99cc

🔧 组件架构

智能体控制器(增强版v2.0)

具有企业级功能的中央编排器。

class AgentController:
    """
    增强的多智能体工作流程编排器
    
    v2.0新功能:
    - 全面的token跟踪和成本监控
    - 性能指标和瓶颈检测
    - 带重试机制的高级错误恢复
    - 进度可视化的实时流式处理
    - 长时间运行任务的内存优化
    """
    
    def run(self, messages, tool_manager, **kwargs):
        """执行带监控的完整工作流程"""
        
    def run_stream(self, messages, tool_manager, **kwargs):
        """执行带实时流式处理和进度跟踪"""
        
    def get_comprehensive_token_stats(self):
        """获取详细的token使用和成本分析"""
        
    def enable_performance_monitoring(self):
        """启用详细性能跟踪"""

增强功能:

  • Token经济学: 实时成本跟踪和预算警报
  • 性能分析: 执行时间分析和优化建议
  • 内存管理: 自动清理和资源优化
  • 熔断器: 自动故障检测和恢复
  • 负载均衡: 智能工具选择和请求分发

智能体层次结构(重构版v2.0)

classDiagram
    AgentBase <|-- TaskAnalysisAgent
    AgentBase <|-- PlanningAgent
    AgentBase <|-- ExecutorAgent
    AgentBase <|-- ObservationAgent
    AgentBase <|-- TaskSummaryAgent
    AgentBase <|-- DirectExecutorAgent
    
    class AgentBase {
        +token_stats: Dict
        +performance_metrics: Dict
        +run(messages, tool_manager)
        +run_stream(messages, tool_manager)
        +_track_token_usage(response, step_name)
        +_track_streaming_token_usage(chunks, step_name)
        +get_token_stats()
        +reset_token_stats()
        +_handle_error_generic(error, context)
    }
    
    class TaskAnalysisAgent {
        +analyze_requirements()
        +extract_objectives()
        +assess_complexity()
        +determine_execution_strategy()
    }
    
    class PlanningAgent {
        +decompose_tasks()
        +identify_dependencies()
        +create_execution_plan()
        +optimize_resource_allocation()
    }
    
    class ExecutorAgent {
        +execute_plan()
        +call_tools_with_retry()
        +handle_tool_results()
        +manage_concurrent_execution()
    }
    
    class ObservationAgent {
        +monitor_progress()
        +detect_completion()
        +identify_failures()
        +suggest_corrections()
    }

📊 Token跟踪系统

架构概述

graph LR
    subgraph "🔍 收集层"
        ST[流跟踪器]
        RT[响应跟踪器]
        UT[使用提取器]
    end
    
    subgraph "📊 处理层"
        AS[智能体聚合器]
        CS[成本计算器]
        PA[性能分析器]
    end
    
    subgraph "💾 存储层"
        TS[Token存储]
        MS[指标存储]
        ES[导出服务]
    end
    
    subgraph "📈 分析层"
        CA[成本分析]
        PA2[性能分析]
        RA[推荐引擎]
    end
    
    ST --> AS
    RT --> AS
    UT --> AS
    
    AS --> CS
    CS --> PA
    PA --> TS
    TS --> MS
    MS --> ES
    
    TS --> CA
    MS --> PA2
    CA --> RA
    PA2 --> RA

Token使用流程

# 带详细指标的增强token跟踪
class TokenTracker:
    def track_agent_usage(self, agent_name, usage_data):
        """按智能体跟踪token使用并计算成本"""
        
    def track_streaming_usage(self, chunks, agent_name):
        """跟踪带实时更新的流式响应"""
        
    def calculate_costs(self, model_name, usage_data):
        """基于模型定价计算成本"""
        
    def get_performance_insights(self):
        """分析性能模式和瓶颈"""
        
    def export_detailed_report(self, format='csv'):
        """导出全面使用报告"""

跟踪的关键指标:

  • 输入Token: 请求处理成本
  • 输出Token: 响应生成成本
  • 缓存Token: 优化节省
  • 推理Token: 高级模型功能(o1等)
  • 执行时间: 性能跟踪
  • 成功率: 可靠性指标
  • 每次操作成本: 经济效率

工具系统架构(增强版)

graph TB
    subgraph "🔧 发现和注册"
        AD[自动发现<br/>📂 目录扫描]
        TR[工具注册表<br/>📋 中央目录]
        TV[工具验证<br/>✅ 模式检查]
        TH[工具健康检查<br/>🩺 状态监控]
    end
    
    subgraph "🛠️ 工具类别"
        LT[本地工具<br/>📱 内置功能]
        MT[MCP工具<br/>🌐 外部服务器]
        AT[智能体工具<br/>🤖 智能体包装器]
        CT[自定义工具<br/>🎨 用户扩展]
    end
    
    subgraph "⚡ 执行引擎"
        TE[工具执行器<br/>🔧 多线程]
        TQ[任务队列<br/>📬 负载均衡]
        CB[熔断器<br/>🛡️ 容错]
        RM[重试管理器<br/>🔄 错误恢复]
    end
    
    subgraph "📊 监控"
        PM[性能监控器<br/>⏱️ 指标]
        LB[负载均衡器<br/>⚖️ 分发]
        CH[缓存处理器<br/>💾 优化]
    end
    
    AD --> TR
    TV --> TR
    TH --> TR
    
    TR --> LT
    TR --> MT
    TR --> AT
    TR --> CT
    
    LT --> TQ
    MT --> TQ
    AT --> TQ
    CT --> TQ
    
    TQ --> TE
    TE --> CB
    TE --> RM
    
    TE --> PM
    PM --> LB
    LB --> CH

🛡️ 错误处理和恢复

多层错误管理

graph TD
    subgraph "🎯 检测层"
        ED[错误检测<br/>🔍 实时监控]
        TD[超时检测<br/>⏰ 资源管理]
        FD[故障检测<br/>💥 异常识别]
    end
    
    subgraph "🔄 恢复层"
        AR[自动重试<br/>🔁 指数退避]
        FB[回退策略<br/>🛤️ 替代路径]
        GD[优雅降级<br/>📉 功能减少]
    end
    
    subgraph "📝 日志层"
        SL[结构化日志<br/>📊 JSON格式]
        AT[警报触发<br/>🚨 通知]
        RM[恢复指标<br/>📈 成功跟踪]
    end
    
    ED --> AR
    TD --> FB
    FD --> GD
    
    AR --> SL
    FB --> AT
    GD --> RM

错误类别和策略

class ErrorManager:
    """全面的错误处理和恢复系统"""
    
    ERROR_STRATEGIES = {
        'NetworkError': 'retry_with_backoff',
        'TokenLimitError': 'truncate_and_retry',
        'ToolTimeoutError': 'fallback_to_alternative',
        'ModelUnavailableError': 'switch_provider',
        'ValidationError': 'graceful_degradation'
    }
    
    def handle_error(self, error, context):
        """将错误路由到适当的恢复策略"""
        
    def retry_with_backoff(self, operation, max_attempts=3):
        """实现指数退避重试逻辑"""
        
    def circuit_breaker(self, service_name, failure_threshold=5):
        """为外部服务实现熔断器模式"""

📈 性能监控

实时指标收集

graph LR
    subgraph "📊 数据收集"
        ET[执行时间]
        MU[内存使用]
        TU[Token消耗]
        TR[工具响应时间]
    end
    
    subgraph "🔍 分析引擎"
        BA[瓶颈分析]
        PA[性能剖析]
        CA[成本分析]
        RA[资源分析]
    end
    
    subgraph "🎯 优化"
        RS[资源扩展]
        LO[负载优化]
        CC[缓存控制]
        PT[性能调优]
    end
    
    ET --> BA
    MU --> PA
    TU --> CA
    TR --> RA
    
    BA --> RS
    PA --> LO
    CA --> CC
    RA --> PT

性能分析

class PerformanceMonitor:
    """高级性能监控和优化"""
    
    def collect_metrics(self):
        """收集全面的性能数据"""
        return {
            'execution_times': self._get_execution_times(),
            'memory_usage': self._get_memory_stats(),
            'token_efficiency': self._analyze_token_usage(),
            'tool_performance': self._get_tool_metrics(),
            'bottlenecks': self._identify_bottlenecks()
        }
    
    def generate_optimization_report(self):
        """生成可操作的优化建议"""
        
    def export_performance_data(self, format='json'):
        """导出详细的性能分析"""

⚙️ 增强配置系统

分层配置管理

graph TD
    subgraph "📁 配置源"
        ENV[环境变量<br/>🌍 系统级别]
        FILE[配置文件<br/>📄 YAML/JSON]
        CLI[命令行<br/>⌨️ 运行时参数]
        API[API参数<br/>🔧 程序化]
    end
    
    subgraph "🔄 处理层"
        VAL[验证引擎<br/>✅ 模式检查]
        MER[配置合并器<br/>🔀 优先级处理]
        HOT[热重载<br/>🔥 运行时更新]
    end
    
    subgraph "💾 存储和分发"
        CS[配置存储<br/>📚 集中化]
        CD[配置分发<br/>📡 组件更新]
        CB[配置备份<br/>💼 版本控制]
    end
    
    ENV --> VAL
    FILE --> VAL
    CLI --> VAL
    API --> VAL
    
    VAL --> MER
    MER --> HOT
    HOT --> CS
    
    CS --> CD
    CS --> CB

配置模式

class ConfigurationManager:
    """企业级配置管理"""
    
    SCHEMA = {
        'agents': {
            'max_loop_count': {'type': 'int', 'default': 10, 'min': 1, 'max': 50},
            'tool_timeout': {'type': 'int', 'default': 30, 'min': 5, 'max': 300},
            'retry_attempts': {'type': 'int', 'default': 3, 'min': 1, 'max': 10}
        },
        'performance': {
            'enable_monitoring': {'type': 'bool', 'default': True},
            'memory_threshold': {'type': 'int', 'default': 1024, 'min': 256},
            'cache_ttl': {'type': 'int', 'default': 3600, 'min': 60}
        },
        'costs': {
            'budget_alert_threshold': {'type': 'float', 'default': 10.0, 'min': 0.1},
            'cost_tracking_enabled': {'type': 'bool', 'default': True}
        }
    }
    
    def validate_config(self, config):
        """根据模式验证配置"""
        
    def hot_reload(self, config_path):
        """不重启重新加载配置"""

🔌 扩展点

插件架构

class PluginManager:
    """自定义功能的可扩展插件系统"""
    
    def register_agent_plugin(self, plugin_class):
        """注册自定义智能体实现"""
        
    def register_tool_plugin(self, plugin_class):
        """注册自定义工具实现"""
        
    def register_middleware(self, middleware_class):
        """注册请求/响应中间件"""
        
    def load_plugins_from_directory(self, directory):
        """自动发现和加载插件"""

自定义智能体开发

class CustomAgent(AgentBase):
    """创建自定义智能体的模板"""
    
    def __init__(self, model, config):
        super().__init__(model, config, system_prefix="自定义智能体提示")
        self.agent_description = "用于特定任务的自定义智能体"
    
    def run_stream(self, messages, tool_manager, context):
        """实现自定义智能体逻辑"""
        # 您的自定义实现在这里
        yield from self._execute_streaming_with_token_tracking(
            prompt="您的自定义提示",
            step_name="custom_operation"
        )

🎯 消息流程和数据结构

增强消息格式

# 带监控元数据的增强消息结构
MESSAGE_SCHEMA = {
    'role': str,              # 'user', 'assistant', 'tool'
    'content': str,           # 主要消息内容
    'type': str,              # 'normal', 'thinking', 'tool_call', 等
    'message_id': str,        # 唯一标识符
    'show_content': str,      # 显示友好内容
    'usage': {                # Token使用信息
        'prompt_tokens': int,
        'completion_tokens': int,
        'total_tokens': int,
        'cached_tokens': int,
        'reasoning_tokens': int
    },
    'metadata': {             # 性能和监控数据
        'execution_time': float,
        'agent_name': str,
        'step_name': str,
        'timestamp': float,
        'success': bool
    },
    'tool_calls': List,       # 工具调用数据
    'tool_call_id': str       # 工具响应链接
}

这种增强的架构提供了企业级可靠性、全面监控和生产就绪的性能优化,同时保持了使Sage在开发中强大的模块化和可扩展性。


Copyright © 2024 Sage Multi-Agent Framework. Distributed by an MIT license.