Skip to main content

Orchestration Guide

Hands-on walkthrough of AgentOS orchestration — from single-node graphs to multi-agent missions with voice, memory, and checkpointing.

All three APIs (AgentGraph, workflow(), mission()) compile to the same CompiledExecutionGraph IR and run on the same GraphRuntime. You can compose them freely — a mission can embed a workflow as a subgraph step; a graph can invoke a compiled workflow as a node.


AgentGraph — Full Graph Builder

Use AgentGraph when you need cycles, complex conditional routing, subgraph composition, or fine-grained control over graph topology.

Minimal Example

A two-node graph: search the web, then summarize results.

import { AgentGraph, START, END, gmiNode, toolNode } from '@framers/agentos/orchestration';
import { z } from 'zod';

const graph = new AgentGraph(
{
input: z.object({ topic: z.string() }),
scratch: z.object({ sources: z.array(z.string()).default([]) }),
artifacts: z.object({ summary: z.string().default('') }),
},
{ reducers: { 'scratch.sources': 'concat' } },
)
.addNode('search', toolNode('web_search'))
.addNode('summarize', gmiNode({ instructions: 'Summarize the search results in 3 sentences.' }))
.addEdge(START, 'search')
.addEdge('search', 'summarize')
.addEdge('summarize', END)
.compile();

const result = await graph.invoke({ topic: 'quantum computing' });
console.log(result.artifacts.summary);

State schema: The graph declares input (immutable after start), scratch (mutable working state), and artifacts (final output). Reducers control how concurrent node writes merge — concat appends arrays, merge deep-merges objects, replace overwrites.

Conditional Routing

Route execution based on intermediate results. The classifier determines intent, then a routing function sends the query to the appropriate handler.

import { AgentGraph, START, END, gmiNode, toolNode } from '@framers/agentos/orchestration';
import { z } from 'zod';

const graph = new AgentGraph({
input: z.object({ query: z.string() }),
scratch: z.object({ intent: z.string().default('') }),
artifacts: z.object({ answer: z.string().default('') }),
})
.addNode('classify', gmiNode({
instructions: 'Classify the query as "factual", "creative", or "code". Reply with only the label.',
}))
.addNode('factual', toolNode('web_search'))
.addNode('creative', gmiNode({ instructions: 'Write a creative, engaging response.' }))
.addNode('code', gmiNode({ instructions: 'Write clean, documented code with an explanation.' }))
.addEdge(START, 'classify')
.addConditionalEdge('classify', (state) => state.scratch.intent, {
factual: 'factual',
creative: 'creative',
code: 'code',
})
.addEdge('factual', END)
.addEdge('creative', END)
.addEdge('code', END)
.compile();

Agent Loop with Cycle

A research loop that searches, evaluates whether it has enough information, and cycles back if not. The maxIterations guard prevents runaway loops.

const graph = new AgentGraph({
input: z.object({ question: z.string() }),
scratch: z.object({ iterations: z.number().default(0), found: z.boolean().default(false) }),
artifacts: z.object({ answer: z.string().default('') }),
})
.addNode('search', toolNode('web_search'))
.addNode('evaluate', gmiNode({
instructions: 'Evaluate whether the search results contain enough information to answer the question. Set found=true if yes.',
}))
.addNode('answer', gmiNode({
instructions: 'Synthesize the research into a comprehensive answer with citations.',
}))
.addEdge(START, 'search')
.addEdge('search', 'evaluate')
.addConditionalEdge('evaluate', (state) => {
if (state.scratch.found || state.scratch.iterations >= 3) return 'answer';
return 'search'; // cycle back for more research
})
.addEdge('answer', END)
.compile();

Subgraph Composition

Embed a compiled graph as a node inside another graph. The inner graph runs to completion and its artifacts merge into the outer state.

const researchGraph = new AgentGraph({ /* ... research loop above ... */ }).compile();

const productionGraph = new AgentGraph({
input: z.object({ topic: z.string() }),
scratch: z.object({}),
artifacts: z.object({ post: z.string().default(''), image: z.string().default('') }),
})
.addNode('research', subgraphNode(researchGraph))
.addNode('write', gmiNode({ instructions: 'Write a blog post from the research.' }))
.addNode('illustrate', toolNode('image_generate'))
.addEdge(START, 'research')
.addEdge('research', 'write')
.addEdge('write', 'illustrate')
.addEdge('illustrate', END)
.compile();

Node Configuration

Every node builder accepts an optional policies object controlling memory, guardrails, checkpointing, and execution mode:

gmiNode(
{
instructions: 'Summarize the document.',
executionMode: 'react_bounded',
maxInternalIterations: 5,
maxTokens: 2048,
temperature: 0.3,
},
{
memory: {
consistency: 'snapshot',
read: { types: ['semantic', 'episodic'], semanticQuery: '{input.topic}', maxTraces: 10 },
write: { autoEncode: true, type: 'episodic', scope: 'session' },
},
guardrails: { output: ['content-safety', 'pii-redaction'], onViolation: 'sanitize' },
checkpoint: 'after',
}
)

Execution modes for gmiNode:

ModeBehavior
single_turnOne LLM call, no internal tool loop. Default in workflow() for cost-bounded execution.
react_boundedReAct loop up to maxInternalIterations. Default in AgentGraph. Agent can call tools, observe results, and reason across multiple turns.
planner_controlledPlanningEngine drives the loop. Default in mission(). The planner decides when to stop based on goal satisfaction.

WorkflowBuilder — Sequential Pipelines

Use workflow() for deterministic pipelines where steps are known upfront. Cycles are rejected at compile time — if you need them, use AgentGraph.

Quick Start

import { workflow } from '@framers/agentos/orchestration';
import { z } from 'zod';

const pipeline = workflow('content-pipeline')
.input(z.object({ url: z.string() }))
.returns(z.object({ summary: z.string(), tags: z.array(z.string()) }))
.step('fetch', { tool: 'web_fetch', effectClass: 'external' })
.step('summarize', { gmi: { instructions: 'Summarize in 3 sentences.' } })
.step('tag', { gmi: { instructions: 'Extract 5 topic tags as a JSON array.' } })
.compile();

const result = await pipeline.invoke({ url: 'https://example.com/article' });
console.log(result.summary);
console.log(result.tags);

Branching

Route to different processing paths based on classification:

workflow('triage')
.input(z.object({ ticket: z.string() }))
.returns(z.object({ response: z.string() }))
.step('classify', { gmi: { instructions: 'Classify as "billing", "technical", or "general".' } })
.branch(
(state) => state.scratch.classification,
{
billing: (wf) => wf.step('billing-agent', { gmi: { instructions: 'Handle billing issue. Check account status, explain charges, process refunds.' } }),
technical: (wf) => wf.step('technical-agent', { gmi: { instructions: 'Diagnose and resolve the technical issue. Check logs, suggest fixes.' } }),
general: (wf) => wf.step('general-agent', { gmi: { instructions: 'Handle general inquiry with helpful, clear responses.' } }),
}
)
.compile();

Parallel Steps

Execute independent steps concurrently with configurable reducers to merge results:

workflow('multi-source-research')
.input(z.object({ query: z.string() }))
.returns(z.object({ report: z.string() }))
.parallel(
{ reducers: { 'scratch.results': 'concat' } },
(wf) => wf.step('web', { tool: 'web_search' }),
(wf) => wf.step('news', { tool: 'news_search' }),
(wf) => wf.step('papers', { tool: 'arxiv_search' }),
)
.step('synthesize', {
gmi: { instructions: 'Synthesize all sources into a coherent research report with citations.' },
memory: { read: { types: ['semantic'], maxTraces: 5 } },
})
.compile();

Human-in-the-Loop Step

Suspend execution and wait for human approval before proceeding:

workflow('content-approval')
.input(z.object({ brief: z.string() }))
.returns(z.object({ publishedPost: z.string() }))
.step('draft', { gmi: { instructions: 'Write a blog post draft based on the brief.' } })
.step('approve', { human: { prompt: 'Review the draft. Approve or request changes.' } })
.step('publish', { tool: 'cms_publish', effectClass: 'external' })
.compile();

The human step emits a human_input_required event on the stream and pauses execution. The host application presents the prompt to the user, collects their response, and calls graph.resumeWithHumanInput(runId, response) to continue.

Memory-Aware Steps

Steps can read from and write to cognitive memory:

workflow('personalized-response')
.input(z.object({ userId: z.string(), question: z.string() }))
.returns(z.object({ answer: z.string() }))
.step('recall', {
gmi: { instructions: 'Answer the question using past interaction context.' },
memory: {
read: { types: ['episodic', 'semantic'], semanticQuery: '{input.question}', maxTraces: 10 },
write: { autoEncode: true, type: 'episodic', scope: 'user' },
},
})
.compile();

MissionBuilder — Goal-Oriented Execution

Use mission() when you want to describe what the agent should achieve rather than how to achieve it. The mission compiler uses Tree of Thought planning to decompose the goal into an execution graph.

Quick Start

import { mission } from '@framers/agentos/orchestration';
import { z } from 'zod';

const researchMission = mission('research')
.input(z.object({ topic: z.string() }))
.goal('Research {{topic}} and produce a concise 3-paragraph summary with citations.')
.returns(z.object({ summary: z.string(), citations: z.array(z.string()) }))
.planner({ strategy: 'tree_of_thought', branches: 3, maxSteps: 8 })
.autonomy('guardrailed')
.providerStrategy('balanced')
.costCap(5.00)
.compile();

const result = await researchMission.invoke({ topic: 'quantum error correction' });
console.log(result.summary);

Planner Strategies

The planner decomposes the goal into a graph structure. Three strategies are available:

StrategyBehavior
linearSequential decomposition — each step feeds the next. Fastest planning, simplest graph.
tree_of_thoughtGenerates N candidate decompositions, evaluates each on feasibility/cost/latency/robustness, selects the best or synthesizes a hybrid. Based on Yao et al. 2023.
reactReAct-style observe-think-act loop — the planner observes intermediate results and decides the next step dynamically.
// Tree of Thought — explores 3 candidate plans, picks the best
.planner({ strategy: 'tree_of_thought', branches: 3, maxSteps: 12 })

// Linear — fast, deterministic decomposition
.planner({ strategy: 'linear', maxSteps: 8 })

// ReAct — adaptive, observes results between steps
.planner({ strategy: 'react', maxIterations: 5 })

Autonomy Modes

Control how much the mission can self-expand during execution:

ModeBehavior
autonomousAll expansion requests auto-approve. Only stops at cost/agent caps.
guidedEvery expansion proposal pauses for human approval.
guardrailedAuto-approves below thresholds (cost, agent count, tool forges). Above thresholds, pauses for approval.
mission('deep-research')
.goal('...')
.autonomy('guardrailed')
.costCap(10.00)
.maxAgents(8)
.compile();

Provider Strategy

Assign LLM providers per node based on task complexity:

// Balanced — expensive models for complex reasoning, cheap for routing
.providerStrategy('balanced')

// Explicit — assign providers per role
.providerStrategy('explicit', {
researcher: { provider: 'anthropic', model: 'claude-sonnet-4-6' },
writer: { provider: 'openai', model: 'gpt-4o' },
_default: { provider: 'openai', model: 'gpt-4o-mini' },
})

// Cheapest — minimize cost across all nodes
.providerStrategy('cheapest')

Anchor Nodes

Inject fixed logic at specific positions in the planner's output — anchors persist regardless of what the planner generates:

mission('audited-research')
.input(z.object({ topic: z.string() }))
.goal('Research {{topic}} thoroughly.')
.returns(z.object({ report: z.string() }))
.anchor({ position: 'before_first', node: humanNode({ prompt: 'Approve the research topic?' }) })
.anchor({ position: 'after_last', node: toolNode('report_publisher') })
.compile();

Voice Nodes in Graphs

Embed full voice pipeline turns directly inside any graph — STT, LLM reasoning, and TTS as a single node:

import { AgentGraph, START, END, voiceNode, gmiNode } from '@framers/agentos/orchestration';
import { z } from 'zod';

const callGraph = new AgentGraph({
input: z.object({ callerId: z.string() }),
scratch: z.object({ transcript: z.string().default('') }),
artifacts: z.object({ resolution: z.string().default('') }),
})
.addNode(
'listen',
voiceNode('listen', {
mode: 'conversation',
maxTurns: 10,
sttProvider: 'deepgram',
ttsProvider: 'elevenlabs',
bargeIn: true,
})
.on('completed', 'resolve')
.on('interrupted', 'listen')
.on('hangup', 'cleanup')
.build()
)
.addNode('resolve', gmiNode({ instructions: 'Determine the resolution based on the conversation transcript.' }))
.addNode('cleanup', toolNode('close_ticket'))
.addEdge(START, 'listen')
.addEdge('resolve', END)
.addEdge('cleanup', END)
.compile();

Voice node options:

OptionTypeDefaultDescription
mode'single_turn' | 'conversation''single_turn'One exchange vs multi-turn dialogue
maxTurnsnumber5Hard cap on dialogue turns
sttProviderstringglobal configOverride STT provider for this node
ttsProviderstringglobal configOverride TTS provider for this node
bargeInbooleanfalseAllow user to interrupt TTS playback
vadSensitivitynumber0.5Voice activity detection threshold (0-1)

Checkpointing and Resume

Any compiled graph supports durable checkpoints. Swap in a persistent store for production — the interface is the same.

import { InMemoryCheckpointStore } from '@framers/agentos/orchestration/checkpoint';

const store = new InMemoryCheckpointStore('./runs.db');

const graph = new AgentGraph({ /* ... */ })
.compile({ checkpointStore: store, checkpointPolicy: 'every_node' });

// First run
const runId = 'run-abc-123';
try {
await graph.invoke({ topic: 'fusion energy' }, { runId });
} catch (err) {
console.error('Run failed mid-way, will resume later.');
}

// Resume from the last completed node
const resumed = await graph.resume(runId);
console.log(resumed.artifacts);

Checkpoint policies:

PolicyBehavior
'none'No checkpoints. Default.
'every_node'Checkpoint after each node completes. Full recoverability, higher storage cost.
'explicit'Checkpoint only at nodes with checkpoint: 'after' in their policy. Selective.

Time-Travel and Forking

Fork from a historical checkpoint to explore alternative execution paths with modified state:

// Fork from checkpoint, patch the state, run from that point
const forkedRunId = await store.fork(checkpointId, {
scratch: { iterations: 0, confidence: 0.9 },
});
const altResult = await graph.resume(forkedRunId);

// List all checkpoints for a run
const checkpoints = await store.list(runId);
// → [{ id, nodeId, timestamp, stateSnapshot }, ...]

GraphEvent Streaming

All graph executions emit a unified event stream. Subscribe via for await...of for real-time UI updates, logging, and debugging:

const stream = graph.stream({ topic: 'AI safety' });

for await (const event of stream) {
switch (event.type) {
case 'node_started':
console.log(`${event.nodeId} started`);
break;
case 'node_completed':
console.log(`${event.nodeId} completed in ${event.durationMs}ms`);
break;
case 'text_delta':
process.stdout.write(event.delta);
break;
case 'tool_call':
console.log(` tool: ${event.toolName}(${JSON.stringify(event.args)})`);
break;
case 'tool_result':
console.log(` result: ${JSON.stringify(event.result).slice(0, 100)}`);
break;
case 'human_input_required':
console.log(` PAUSED: ${event.prompt}`);
break;
case 'checkpoint_saved':
console.log(` checkpoint: ${event.checkpointId}`);
break;
case 'guardrail_violation':
console.log(` violation: ${event.guardrailId}${event.action}`);
break;
case 'graph_completed':
console.log('\nDone.', event.artifacts);
break;
case 'graph_error':
console.error('Failed:', event.error);
break;
}
}

Full event type reference:

EventPayloadWhen
node_startednodeId, nodeTypeNode execution begins
node_completednodeId, durationMs, outputNode execution completes
text_deltadelta, nodeIdStreaming LLM text chunk
tool_calltoolName, args, nodeIdTool invocation
tool_resulttoolName, result, nodeIdTool execution result
human_input_requiredprompt, nodeId, runIdHuman-in-the-loop pause
checkpoint_savedcheckpointId, runId, nodeIdCheckpoint persisted
state_updatepath, value, reducerGraph state mutation
guardrail_checkguardrailId, nodeId, passedGuardrail evaluation
guardrail_violationguardrailId, action, detailsGuardrail triggered
memory_readtypes, traceCount, nodeIdMemory traces retrieved
memory_writetype, scope, nodeIdMemory trace encoded
discovery_matchquery, matchedCapability, confidenceCapability discovery resolved
graph_completedartifacts, durationMs, tokenCountGraph execution finished
graph_errorerror, nodeId?Graph execution failed

YAML Workflow Authoring

For non-TypeScript environments or declarative pipeline definitions, workflows can be authored as YAML:

# workflows/summarize.yaml
name: summarize-article
input:
schema:
type: object
properties:
url: { type: string }
required: [url]
returns:
schema:
type: object
properties:
summary: { type: string }
tags: { type: array, items: { type: string } }

steps:
- id: fetch
tool: web_fetch
effectClass: external

- id: summarize
gmi:
instructions: Summarize the article in 3 sentences.
memory:
read:
types: [semantic]
maxTraces: 5

- id: tag
gmi:
instructions: Extract 5 topic tags as a JSON array.

- id: review
human:
prompt: Review the summary and tags. Approve or request changes.

Load and execute:

import { loadWorkflowFromYaml } from '@framers/agentos/orchestration';
import { readFileSync } from 'fs';

const yaml = readFileSync('./workflows/summarize.yaml', 'utf8');
const wf = await loadWorkflowFromYaml(yaml);
const result = await wf.invoke({ url: 'https://example.com/article' });

YAML workflows support the full feature set — branching, parallelism, memory, guardrails, and human-in-the-loop steps.


Choosing the Right API

If you need...UseWhy
Known steps in a fixed orderworkflow()Compile-time DAG validation, deterministic cost
Conditional branching or cyclesAgentGraphFull graph model, arbitrary routing
The agent to figure out its own stepsmission()Tree of Thought planning, self-expansion
Multiple specialized agents coordinatingagency()Strategy-based multi-agent (debate, pipeline, supervisor)
One-off LLM callgenerateText() / streamText()No graph overhead, direct provider call
Voice conversation flowAgentGraph + voiceNodeFull IVR support with barge-in and hangup handling
Cost-bounded pipelineworkflow()Single-turn GMI, no runaway loops
Prototype → productionmission()AgentGraphStart with a goal, extract the generated IR, hand-tune