AgentGraph
AgentGraph is the lowest-level authoring API in the Unified Orchestration Layer. It exposes explicit node and edge management, supports cycles, subgraph composition, and four edge types including the AgentOS-exclusive discovery and personality edges.
Current runtime note:
AgentGraphcompilation is complete, but some advanced execution paths are still bridge-dependent today. The base runtime executestool,router,guardrail, andhumannodes directly.gmi,extension, andsubgraphexecution require a higher-level runtime bridge, and discovery/personality edges are still partial unless those integrations are wired.
Use AgentGraph when you need full control: complex conditional routing, agent loops that cycle back, memory-driven state machines, or subgraph composition. For linear pipelines, see workflow(). For goal-driven orchestration, see mission().
Quick Start
import {
AgentGraph, START, END,
gmiNode, toolNode,
} from '@framers/agentos/orchestration';
import { z } from 'zod';
const graph = new AgentGraph(
{
input: z.object({ topic: z.string() }),
scratch: z.object({ sources: z.array(z.string()).default([]) }),
artifacts: z.object({ summary: z.string() }),
},
{ reducers: { 'scratch.sources': 'concat' } }
)
.addNode('search', toolNode('web_search'))
.addNode('summarize', gmiNode({ instructions: 'Summarize the search results.' }))
.addEdge(START, 'search')
.addEdge('search', 'summarize')
.addEdge('summarize', END)
.compile();
const result = await graph.invoke({ topic: 'quantum computing' });
Constructor
new AgentGraph(stateSchema, config?)
| Parameter | Type | Description |
|---|---|---|
stateSchema.input | Zod schema | Shape of the frozen user input |
stateSchema.scratch | Zod schema | Shape of the mutable node-to-node communication bag |
stateSchema.artifacts | Zod schema | Shape of the accumulated outputs returned to the caller |
config.reducers | StateReducers | Field-level merge strategies for parallel branches |
config.memoryConsistency | MemoryConsistencyMode | Graph-wide memory isolation (default: 'snapshot') |
config.checkpointPolicy | 'every_node' | 'explicit' | 'none' | When to persist checkpoints (default: 'none') |
Node Builders
All nodes are created with typed factory functions. Each accepts an optional policies object for memory, discovery, guardrail, and persona configuration.
gmiNode
A General Model Invocation node that calls an LLM. The default executionMode is react_bounded — an internal ReAct tool-use loop capped by maxInternalIterations.
import { gmiNode } from '@framers/agentos/orchestration';
gmiNode(
{
instructions: 'Research the topic thoroughly.',
executionMode: 'react_bounded', // default — bounded ReAct loop
maxInternalIterations: 5, // default
parallelTools: false,
temperature: 0.7,
maxTokens: 2048,
},
{
memory: {
consistency: 'snapshot',
read: { types: ['semantic'], semanticQuery: '{input.topic}', maxTraces: 10 },
write: { autoEncode: true, type: 'episodic', scope: 'session' },
},
discovery: { enabled: true, kind: 'tool', maxResults: 5 },
guardrails: { output: ['content-safety'], onViolation: 'block' },
checkpoint: 'after',
}
)
Execution modes:
| Mode | Description | Default for |
|---|---|---|
single_turn | One LLM call, no internal tool loop | workflow() steps |
react_bounded | ReAct loop up to maxInternalIterations | AgentGraph gmi nodes |
planner_controlled | PlanningEngine controls the loop | mission() steps |
toolNode
Invokes a registered ITool by name. The tool name must match a key in the tool catalogue.
import { toolNode } from '@framers/agentos/orchestration';
toolNode(
'web_search',
{
timeout: 10_000,
// Accepted by the IR today; shared-runtime retries are still being wired.
retryPolicy: { maxAttempts: 3, backoff: 'exponential', backoffMs: 500 },
},
{
effectClass: 'external',
guardrails: { output: ['pii-redaction'], onViolation: 'sanitize' },
}
)
humanNode
Suspends execution and surfaces a prompt to a human operator. The run can be resumed with .resume(checkpointId) after the human responds.
import { humanNode } from '@framers/agentos/orchestration';
humanNode({ prompt: 'Does this summary look accurate? (yes/no)', timeout: 86_400_000 })
routerNode
A pure routing node with no LLM call and no output. Evaluates a condition and emits edges to the appropriate next node. Use this as the source of addConditionalEdge() calls when you need a dedicated branching point.
import { routerNode } from '@framers/agentos/orchestration';
// In-process function (not serializable)
routerNode((state) => state.scratch.confidence > 0.8 ? 'summarize' : 'search')
// Expression string (serializable to JSON/YAML)
routerNode("scratch.confidence > 0.8 ? 'summarize' : 'search'")
guardrailNode
Runs guardrails as an explicit step in the graph, not just on the edge. Use this for pre-flight checks or to gate progress through critical stages.
import { guardrailNode } from '@framers/agentos/orchestration';
guardrailNode(['pii-redaction', 'content-safety'], {
onViolation: 'reroute',
rerouteTarget: 'sanitize-output',
})
subgraphNode
Embeds a previously compiled CompiledExecutionGraph as a single node. Input and output fields are mapped between the parent and child graphs.
At the moment, this is a compile-time authoring primitive. Executing subgraphs requires a runtime bridge that knows how to resolve and invoke nested graphs.
import { subgraphNode } from '@framers/agentos/orchestration';
subgraphNode(compiledSubgraph, {
inputMapping: { 'scratch.query': 'input.topic' }, // parent scratch → child input
outputMapping: { 'artifacts.summary': 'scratch.result' }, // child artifacts → parent scratch
})
Edge Types
Static Edge
Always followed. The most common edge type.
graph.addEdge(START, 'fetch');
graph.addEdge('fetch', 'process');
graph.addEdge('process', END);
Conditional Edge
Target is resolved at runtime by a function receiving the current GraphState.
graph.addConditionalEdge('evaluate', (state) =>
state.scratch.confidence > 0.8 ? 'summarize' : 'search'
);
The function must return a valid node id. The returned id is not validated at compile time.
Discovery Edge
Target is resolved at runtime via semantic search over the capability registry. In the shared runtime today, this remains partial: when discovery is not wired, execution follows the declared fallback target.
graph.addDiscoveryEdge('plan', {
query: 'find a tool that can search academic papers',
kind: 'tool', // restrict to tools only
fallbackTarget: 'web-search', // use this node if discovery returns nothing
});
Runtime semantics (target state):
CapabilityDiscoveryEngine.discover(query, { kind })is called- The top-1 result is selected
- A transient executable node is instantiated
- Execution continues through the resolved target
- If no results: route to
fallbackTarget, or emitDISCOVERY_NO_RESULTS
Personality Edge
Routes based on the agent's current HEXACO/PAD trait value. No conditional logic required in your code once a personality source is wired into the runtime.
graph.addPersonalityEdge('draft', {
trait: 'conscientiousness', // HEXACO trait name
threshold: 0.7, // decision boundary (0–1)
above: 'human-review', // route when trait >= threshold
below: END, // route when trait < threshold
});
Available HEXACO traits: honesty_humility, emotionality, extraversion, agreeableness, conscientiousness, openness.
State Management
GraphState has three partitions you control and two managed by the runtime:
interface GraphState<TInput, TScratch, TArtifacts> {
input: Readonly<TInput>; // Frozen at graph start — nodes cannot write
scratch: TScratch; // Mutable node-to-node bag
artifacts: TArtifacts; // Accumulated outputs returned to caller
// Runtime-managed:
memory: MemoryView; // Read-only memory traces (populated by MemoryPolicy)
diagnostics: DiagnosticsView; // Token usage, latency, discovery results
currentNodeId: string;
visitedNodes: string[];
iteration: number;
checkpointId?: string;
}
State Reducers
When parallel branches or a loop writes to the same field, you need a reducer to define the merge strategy:
const graph = new AgentGraph(stateSchema, {
reducers: {
'scratch.sources': 'concat', // Arrays: [...existing, ...incoming]
'scratch.confidence': 'max', // Numbers: Math.max(existing, incoming)
'artifacts.summary': 'last', // Any: last-write-wins (default)
'artifacts.score': (a, b) => (Number(a) + Number(b)) / 2, // Custom
},
});
Built-in reducers: concat, merge, max, min, avg, sum, last, first, longest
Compilation
const compiled = graph.compile({
checkpointStore: new SqliteCheckpointStore('./runs.db'),
validate: true, // default — throws on unreachable nodes or structural errors
});
AgentGraph allows cycles (validate: false is only needed for intentional orphan nodes).
Execution
// Run to completion
const result = await compiled.invoke({ topic: 'quantum computing' });
// Stream events
for await (const event of compiled.stream({ topic: 'quantum computing' })) {
console.log(event.type, event.nodeId);
// event.type: 'run_start' | 'node_start' | 'node_end' | 'edge_transition' | 'run_end'
}
// Resume from checkpoint after interruption
const result = await compiled.resume(checkpointId);
// Export IR for debugging or visualization
const ir = compiled.toIR();
Subgraph Composition
Build modular graphs by nesting compiled graphs as single nodes:
// Build the inner graph
const fetchGraph = new AgentGraph(fetchState)
.addNode('fetch', toolNode('web_fetch'))
.addNode('parse', toolNode('html_parser'))
.addEdge(START, 'fetch')
.addEdge('fetch', 'parse')
.addEdge('parse', END)
.compile();
// Embed it in the outer graph
const outerGraph = new AgentGraph(outerState)
.addNode('gather', subgraphNode(fetchGraph.toIR(), {
inputMapping: { 'input.url': 'input.url' },
outputMapping: { 'artifacts.text': 'scratch.rawText' },
}))
.addNode('analyze', gmiNode({ instructions: 'Analyze the text.' }))
.addEdge(START, 'gather')
.addEdge('gather', 'analyze')
.addEdge('analyze', END)
.compile();
Complete Example — Research Graph
import {
AgentGraph, START, END,
gmiNode, toolNode, humanNode,
} from '@framers/agentos/orchestration';
import { SqliteCheckpointStore } from '@framers/agentos/orchestration/checkpoint';
import { z } from 'zod';
const ResearchState = {
input: z.object({ topic: z.string() }),
scratch: z.object({
sources: z.array(z.string()).default([]),
confidence: z.number().default(0),
}),
artifacts: z.object({
summary: z.string(),
sources: z.array(z.string()),
}),
};
const graph = new AgentGraph(ResearchState, {
reducers: { 'scratch.sources': 'concat' },
memoryConsistency: 'snapshot',
checkpointPolicy: 'every_node',
})
.addNode('plan', gmiNode(
{
instructions: 'Break this research topic into sub-questions.',
executionMode: 'single_turn',
},
{
memory: {
consistency: 'snapshot',
read: { types: ['semantic'], semanticQuery: '{input.topic}', maxTraces: 10 },
},
discovery: { enabled: true, kind: 'tool' },
checkpoint: 'after',
}
))
.addNode('search', toolNode(
'web_search',
{ timeout: 10_000 },
{
effectClass: 'external',
guardrails: { output: ['pii-redaction'], onViolation: 'sanitize' },
}
))
.addNode('evaluate', gmiNode(
{
instructions: 'Evaluate source quality and assign a confidence score (0–1).',
executionMode: 'single_turn',
},
{
memory: {
consistency: 'snapshot',
write: { autoEncode: true, type: 'episodic', scope: 'session' },
},
}
))
.addNode('summarize', gmiNode(
{
instructions: 'Write a final summary from gathered sources.',
executionMode: 'single_turn',
},
{
guardrails: {
output: ['grounding-guard'],
onViolation: 'reroute',
rerouteTarget: 'search',
},
}
))
.addNode('review', humanNode({ prompt: 'Does this summary look accurate?' }))
.addEdge(START, 'plan')
.addEdge('plan', 'search')
.addEdge('search', 'evaluate')
.addConditionalEdge('evaluate', (state) =>
state.scratch.confidence > 0.8 ? 'summarize' : 'search'
)
.addPersonalityEdge('summarize', {
trait: 'conscientiousness',
threshold: 0.7,
above: 'review',
below: END,
})
.addEdge('review', END)
.compile({
checkpointStore: new SqliteCheckpointStore('./research-checkpoints.db'),
});
// Run
const result = await graph.invoke({ topic: 'quantum computing' });
// Stream with progress
for await (const event of graph.stream({ topic: 'quantum computing' })) {
if (event.type === 'node_start') console.log(`Starting: ${event.nodeId}`);
if (event.type === 'node_end') console.log(`Done: ${event.nodeId}`);
}
// Resume after interruption at human-review step
const result2 = await graph.resume(savedCheckpointId);
See Also
- workflow() DSL — simpler API for DAG pipelines
- Checkpointing — ICheckpointStore, resume, time-travel
- Unified Orchestration — architecture overview