智能体与流程编排

这一篇覆盖 sagents/agent/sagents/flow/Agent 是干活的单元,Flow 决定 Agent 之间的执行关系。 两者解耦:换流程不改 Agent,换 Agent 也不改流程。

1. Agent 层

1.1 AgentBase 提供的能力

flowchart LR
    Base[AgentBase] --> RunStream[run_stream<br/>统一异步流接口]
    Base --> Model[持有 model + model_config]
    Base --> SafeCall[LLM 调用:<br/>重试 + 降级 + sanitize]
    Base --> PromptCache[prompt 缓存控制]
    Base --> Schema[工具 schema 转换]
    Base --> Stream[流式 chunk 拼装]
    Base --> Live[读取 Session/SessionContext]

Agent 自身不持有“这次会话的状态”,状态都在 SessionContext 里——Agent 因此可以并发复用。

1.2 已实现的专用 Agent

flowchart TB
    subgraph 分析
        Analysis[task_analysis]
    end

    subgraph 召回与建议
        Recall[memory_recall]
        ToolSug[tool_suggestion]
        WfSel[workflow_select]
        QSug[query_suggest]
    end

    subgraph 简单模式
        Plan[plan]
        Simple[simple]
    end

    subgraph 多智能体模式
        TPlan[task_planning]
        TDecomp[task_decompose]
        TExec[task_executor]
        TObs[task_observation]
        TJudge[task_completion_judge]
        TSum[task_summary]
    end

    subgraph fibre 模式
        Fibre[fibre]
        SelfCheck[self_check]
    end

各 Agent 的 agent_key 是流程中 AgentNode 引用它的标识,例如 simpletask_planning 等。

1.3 FibreAgent 与子智能体编排

flowchart TB
    Main[主 Session] --> FA[FibreAgent.run_stream]
    FA --> Orch[FibreOrchestrator]
    Orch --> Defs[(AgentDefinition 注册表<br/>子智能体配置)]
    Orch --> Backend[FibreBackendClient<br/>从后端取/管理子智能体]
    Orch --> SubMgr[SessionManager 复用]
    Orch -->|委派工具调用| Sub[Sub-session 1..N]
    Sub -->|结果| Orch
    Orch -->|汇总| FA

简单理解:fibre 模式下,主 Agent 通过工具调用把任务委派给子 Agent,每个子 Agent 在自己的 sub-session 里跑,结果再回流到主 session。

2. Flow 层

2.1 节点类型

flowchart LR
    Flow[AgentFlow.root] --> N1[AgentNode<br/>执行单个 Agent]
    Flow --> N2[SequenceNode<br/>顺序执行 steps]
    Flow --> N3[ParallelNode<br/>并行 + max_concurrency]
    Flow --> N4[LoopNode<br/>condition + max_loops 熔断]
    Flow --> N5[IfNode<br/>true_body / false_body]
    Flow --> N6[SwitchNode<br/>variable + cases + default]

整个 Flow 由 AgentFlow(name, root) 包装根节点。run_stream 也接受 custom_flow 参数,调用方可以完全自定义编排。

2.2 条件注册表 flow/conditions.py

flowchart LR
    Reg[ConditionRegistry] --> C1[is_deep_thinking]
    Reg --> C2[enable_more_suggest]
    Reg --> C3[enable_plan]
    Reg --> C4[plan_should_start_execution]
    Reg --> C5[task_not_completed]
    Reg --> C6[self_check_should_retry]
    Reg --> C7[need_summary]

    Reg -.被引用.-> If[IfNode.condition]
    Reg -.被引用.-> Loop[LoopNode.condition]

业务方可以注册自己的条件函数,让 custom_flow 用上(见本页末尾)。

2.3 执行器 flow/executor.py

flowchart TB
    Start([开始执行 AgentFlow.root]) --> Dispatch{节点类型?}

    Dispatch -->|AgentNode| Run[拿 Agent 实例 → run_stream<br/>透传 chunk]
    Dispatch -->|SequenceNode| Seq[顺序执行 steps]
    Dispatch -->|ParallelNode| Par[asyncio 并发 + 信号量]
    Dispatch -->|LoopNode| L1[检查 condition + max_loops]
    Dispatch -->|IfNode| If1[检查 condition]
    Dispatch -->|SwitchNode| Sw1[读 variable → cases]

    Run --> Check{Session 状态?}
    Seq --> Check
    Par --> Check
    L1 --> Check
    If1 --> Check
    Sw1 --> Check

    Check -->|RUNNING| NextNode[下一个节点]
    Check -->|INTERRUPTED / ERROR| Stop([停止])

    NextNode --> Dispatch

执行器只负责“怎么走”,不负责“走到 Agent 后做什么”——后者是 Agent 自己的责任。

3. 默认流程:simple / multi / fibre

SAgent._build_default_flow(agent_mode, max_loop_count) 拼出来的形状:

flowchart TB
    Start([输入 messages]) --> DT{is_deep_thinking?}
    DT -->|是| Ana[task_analysis] --> Sw
    DT -->|否| Sw
    Sw{Switch agent_mode}
    Sw -->|simple| SimpleBody
    Sw -->|multi| MultiBody
    Sw -->|fibre| FibreBody
    SimpleBody --> More
    MultiBody --> More
    FibreBody --> More
    More{enable_more_suggest?}
    More -->|是| QS[query_suggest]
    More -->|否| End
    QS --> End([结束])

simple_agent_body

flowchart TB
    S0([进入 simple]) --> Par1{并行}
    Par1 --> ToolSug1[tool_suggestion]
    Par1 --> Recall1[memory_recall]
    ToolSug1 --> If1
    Recall1 --> If1
    If1{enable_plan?}
    If1 -->|是| Plan1[plan]
    Plan1 --> If2{plan_should_start_execution?}
    If2 -->|是| SimpleAgent1[simple]
    If2 -->|否| Need
    If1 -->|否| SimpleAgent2[simple]
    SimpleAgent1 --> Need
    SimpleAgent2 --> Need
    Need{need_summary?}
    Need -->|是| Sum[task_summary] --> EndS
    Need -->|否| EndS([结束 simple])

multi_agent_full

flowchart TB
    M0([进入 multi]) --> Recall2[memory_recall]
    Recall2 --> Loop[Loop: task_not_completed<br/>最多 max_loop_count 次]
    Loop --> P[task_planning]
    P --> TS[tool_suggestion]
    TS --> E[task_executor]
    E --> O[task_observation]
    O --> J[task_completion_judge]
    J -.task_not_completed.-> Loop
    J -->|完成| Sum2[task_summary]
    Sum2 --> EndM([结束 multi])

fib_agent_body

flowchart TB
    F0([进入 fibre]) --> Loop2[Loop: self_check_should_retry<br/>最多 3 次]
    Loop2 --> Core[fibre core]
    Core --> Par2{并行}
    Par2 --> TS2[tool_suggestion]
    Par2 --> R2[memory_recall]
    TS2 --> If3
    R2 --> If3
    If3{enable_plan?}
    If3 -->|是| Plan2[plan] --> If4{plan_should_start_execution?}
    If4 -->|是| FibA[fibre]
    If4 -->|否| SC
    If3 -->|否| FibB[fibre]
    FibA --> SC
    FibB --> SC
    SC[self_check]
    SC -.should_retry.-> Loop2
    SC -->|不再重试| EndF([结束 fibre])

4. 二次开发:自定义流程与子智能体

调用方有三个扩展点,组合起来几乎可以拼出任意复杂的 Agent 编排,而完全不修改 sagents 源码

4.1 注册自定义条件

from sagents.flow.conditions import ConditionRegistry

@ConditionRegistry.register("user_paid")
def _user_paid(session_context, session=None) -> bool:
    return session_context.system_context.get("plan") == "pro"

4.2 自定义 Flow

from sagents.flow.schema import (
    AgentFlow, SequenceNode, AgentNode, IfNode, LoopNode,
)

custom_flow = AgentFlow(
    name="My Pipeline",
    root=SequenceNode(steps=[
        IfNode(
            condition="user_paid",
            true_body=LoopNode(
                condition="task_not_completed",
                max_loops=10,
                body=SequenceNode(steps=[
                    AgentNode(agent_key="task_planning"),
                    AgentNode(agent_key="task_executor"),
                    AgentNode(agent_key="task_completion_judge"),
                ]),
            ),
            false_body=AgentNode(agent_key="simple"),
        ),
        AgentNode(agent_key="task_summary"),
    ]),
)

# 传给 SAgent.run_stream
async for chunks in agent.run_stream(
    ...,
    custom_flow=custom_flow,
):
    ...

传了 custom_flow 之后,agent_mode / _build_default_flow 都会被绕过。

4.3 自定义子智能体(fibre)

custom_sub_agents=[...] 在不重写流程的前提下注入额外子智能体定义,主要服务 fibre 编排:

async for chunks in agent.run_stream(
    ...,
    agent_mode="fibre",
    custom_sub_agents=[
        {
            "agent_key": "data_fetcher",
            "description": "负责从内部 BI 拉数据",
            "system_prompt": "...",
            "available_tools": ["http_fetcher", "sql_runner"],
        },
        # 更多子智能体定义...
    ],
):
    ...

主 fibre Agent 会把这些子智能体识别成可委派的目标。


Sage documentation for the current repository layout. Source available under the MIT license.

This site uses Just the Docs, a documentation theme for Jekyll.