Agent & Flow Orchestration

This page covers sagents/agent/ and sagents/flow/. An agent is the unit of work; a flow defines the relationships between agents. They are decoupled: changing the flow does not change the agents, and vice versa.

1. The Agent Layer

1.1 What AgentBase Provides

flowchart LR
    Base[AgentBase] --> RunStream[run_stream<br/>uniform async stream interface]
    Base --> Model[holds model + model_config]
    Base --> SafeCall[LLM call: retry + fallback + sanitize]
    Base --> PromptCache[prompt cache control]
    Base --> Schema[tool schema conversion]
    Base --> Stream[streaming chunk assembly]
    Base --> Live[read Session/SessionContext]

Agents own no per-conversation state — that lives in SessionContext — so agents can be reused concurrently.

1.2 Specialized Agents

flowchart TB
    subgraph Analysis
        Analysis[task_analysis]
    end

    subgraph Recall & suggestion
        Recall[memory_recall]
        ToolSug[tool_suggestion]
        WfSel[workflow_select]
        QSug[query_suggest]
    end

    subgraph Simple mode
        Plan[plan]
        Simple[simple]
    end

    subgraph Multi-agent mode
        TPlan[task_planning]
        TDecomp[task_decompose]
        TExec[task_executor]
        TObs[task_observation]
        TJudge[task_completion_judge]
        TSum[task_summary]
    end

    subgraph Fibre mode
        Fibre[fibre]
        SelfCheck[self_check]
    end

Each agent’s agent_key is how flow nodes reference it (e.g. simple, task_planning).

1.3 FibreAgent and Sub-agent Orchestration

flowchart TB
    Main[Main Session] --> FA[FibreAgent.run_stream]
    FA --> Orch[FibreOrchestrator]
    Orch --> Defs[(AgentDefinition registry<br/>sub-agent configs)]
    Orch --> Backend[FibreBackendClient<br/>fetch/manage sub-agents)]
    Orch --> SubMgr[shared SessionManager]
    Orch -->|delegated tool call| Sub[Sub-session 1..N]
    Sub -->|result| Orch
    Orch -->|aggregate| FA

In short: in fibre mode, the main agent delegates work to sub-agents via tool calls; each sub-agent runs in its own sub-session and the result flows back to the main session.

2. The Flow Layer

2.1 Node Types

flowchart LR
    Flow[AgentFlow.root] --> N1[AgentNode<br/>run a single agent]
    Flow --> N2[SequenceNode<br/>run steps in order]
    Flow --> N3[ParallelNode<br/>concurrent + max_concurrency]
    Flow --> N4[LoopNode<br/>condition + max_loops circuit breaker]
    Flow --> N5[IfNode<br/>true_body / false_body]
    Flow --> N6[SwitchNode<br/>variable + cases + default]

The whole flow is wrapped by AgentFlow(name, root). run_stream also accepts custom_flow to fully replace the default orchestration.

2.2 Condition Registry flow/conditions.py

flowchart LR
    Reg[ConditionRegistry] --> C1[is_deep_thinking]
    Reg --> C2[enable_more_suggest]
    Reg --> C3[enable_plan]
    Reg --> C4[plan_should_start_execution]
    Reg --> C5[task_not_completed]
    Reg --> C6[self_check_should_retry]
    Reg --> C7[need_summary]

    Reg -.referenced by.-> If[IfNode.condition]
    Reg -.referenced by.-> Loop[LoopNode.condition]

Callers can register custom conditions and reference them from custom_flow (see end of this page).

2.3 Executor flow/executor.py

flowchart TB
    Start([execute AgentFlow.root]) --> Dispatch{node type?}

    Dispatch -->|AgentNode| Run[get agent → run_stream<br/>pass chunks through]
    Dispatch -->|SequenceNode| Seq[run steps in order]
    Dispatch -->|ParallelNode| Par[asyncio gather + semaphore]
    Dispatch -->|LoopNode| L1[check condition + max_loops]
    Dispatch -->|IfNode| If1[check condition]
    Dispatch -->|SwitchNode| Sw1[read variable → cases]

    Run --> Check{Session status?}
    Seq --> Check
    Par --> Check
    L1 --> Check
    If1 --> Check
    Sw1 --> Check

    Check -->|RUNNING| NextNode[next node]
    Check -->|INTERRUPTED / ERROR| Stop([stop])

    NextNode --> Dispatch

The executor only decides “how to walk the graph”; what happens inside an agent is the agent’s own concern.

3. Default Flow: simple / multi / fibre

What SAgent._build_default_flow(agent_mode, max_loop_count) actually builds:

flowchart TB
    Start([input messages]) --> DT{is_deep_thinking?}
    DT -->|yes| Ana[task_analysis] --> Sw
    DT -->|no| Sw
    Sw{Switch agent_mode}
    Sw -->|simple| SimpleBody
    Sw -->|multi| MultiBody
    Sw -->|fibre| FibreBody
    SimpleBody --> More
    MultiBody --> More
    FibreBody --> More
    More{enable_more_suggest?}
    More -->|yes| QS[query_suggest]
    More -->|no| End
    QS --> End([done])

simple_agent_body

flowchart TB
    S0([enter simple]) --> Par1{parallel}
    Par1 --> ToolSug1[tool_suggestion]
    Par1 --> Recall1[memory_recall]
    ToolSug1 --> If1
    Recall1 --> If1
    If1{enable_plan?}
    If1 -->|yes| Plan1[plan]
    Plan1 --> If2{plan_should_start_execution?}
    If2 -->|yes| SimpleAgent1[simple]
    If2 -->|no| Need
    If1 -->|no| SimpleAgent2[simple]
    SimpleAgent1 --> Need
    SimpleAgent2 --> Need
    Need{need_summary?}
    Need -->|yes| Sum[task_summary] --> EndS
    Need -->|no| EndS([end simple])

multi_agent_full

flowchart TB
    M0([enter multi]) --> Recall2[memory_recall]
    Recall2 --> Loop[Loop: task_not_completed<br/>up to max_loop_count]
    Loop --> P[task_planning]
    P --> TS[tool_suggestion]
    TS --> E[task_executor]
    E --> O[task_observation]
    O --> J[task_completion_judge]
    J -.task_not_completed.-> Loop
    J -->|done| Sum2[task_summary]
    Sum2 --> EndM([end multi])

fib_agent_body

flowchart TB
    F0([enter fibre]) --> Loop2[Loop: self_check_should_retry<br/>up to 3]
    Loop2 --> Core[fibre core]
    Core --> Par2{parallel}
    Par2 --> TS2[tool_suggestion]
    Par2 --> R2[memory_recall]
    TS2 --> If3
    R2 --> If3
    If3{enable_plan?}
    If3 -->|yes| Plan2[plan] --> If4{plan_should_start_execution?}
    If4 -->|yes| FibA[fibre]
    If4 -->|no| SC
    If3 -->|no| FibB[fibre]
    FibA --> SC
    FibB --> SC
    SC[self_check]
    SC -.should_retry.-> Loop2
    SC -->|stop| EndF([end fibre])

4. Extending: Custom Flow & Sub-agents

The runtime exposes three extension points that, combined, can build arbitrary orchestration without touching sagents source.

4.1 Register a custom condition

from sagents.flow.conditions import ConditionRegistry

@ConditionRegistry.register("user_paid")
def _user_paid(session_context, session=None) -> bool:
    return session_context.system_context.get("plan") == "pro"

4.2 Custom Flow

from sagents.flow.schema import (
    AgentFlow, SequenceNode, AgentNode, IfNode, LoopNode,
)

custom_flow = AgentFlow(
    name="My Pipeline",
    root=SequenceNode(steps=[
        IfNode(
            condition="user_paid",
            true_body=LoopNode(
                condition="task_not_completed",
                max_loops=10,
                body=SequenceNode(steps=[
                    AgentNode(agent_key="task_planning"),
                    AgentNode(agent_key="task_executor"),
                    AgentNode(agent_key="task_completion_judge"),
                ]),
            ),
            false_body=AgentNode(agent_key="simple"),
        ),
        AgentNode(agent_key="task_summary"),
    ]),
)

async for chunks in agent.run_stream(
    ...,
    custom_flow=custom_flow,
):
    ...

When custom_flow is provided, agent_mode / _build_default_flow are bypassed.

4.3 Custom sub-agents (fibre)

custom_sub_agents=[...] injects extra sub-agent definitions without rewriting the flow, primarily for fibre orchestration:

async for chunks in agent.run_stream(
    ...,
    agent_mode="fibre",
    custom_sub_agents=[
        {
            "agent_key": "data_fetcher",
            "description": "Pulls data from internal BI",
            "system_prompt": "...",
            "available_tools": ["http_fetcher", "sql_runner"],
        },
        # more sub-agent definitions...
    ],
):
    ...

The main fibre agent recognizes these as valid delegation targets.


Sage documentation for the current repository layout. Source available under the MIT license.

This site uses Just the Docs, a documentation theme for Jekyll.