Checkpointing and Time-Travel
The AgentOS Unified Orchestration Layer has built-in support for checkpoints, resume after failure, and time-travel debugging via the ICheckpointStore interface.
Status note:
InMemoryCheckpointStoreis implemented and used by default.SqliteCheckpointStoreis still planned, not shipped in this package yet.
ICheckpointStore
All checkpoint persistence is done through this interface. Swap implementations without changing any graph code.
import type { ICheckpointStore, Checkpoint, CheckpointMetadata } from '@framers/agentos/orchestration/checkpoint';
interface ICheckpointStore {
save(checkpoint: Checkpoint): Promise<void>;
get(checkpointId: string): Promise<Checkpoint | null>;
load(runId: string, nodeId?: string): Promise<Checkpoint | null>;
latest(runId: string): Promise<Checkpoint | null>;
list(graphId: string, options?: { limit?: number; runId?: string }): Promise<CheckpointMetadata[]>;
delete(checkpointId: string): Promise<void>;
// Time-travel
fork(checkpointId: string, patchState?: Partial<GraphState>): Promise<string>;
}
Implementations
| Store | Import path | Use case |
|---|---|---|
InMemoryCheckpointStore | @framers/agentos/orchestration/checkpoint | Development, testing, ephemeral runs |
| Custom | Implement ICheckpointStore | Postgres, Redis, object storage, or any durable backend |
import {
InMemoryCheckpointStore,
} from '@framers/agentos/orchestration/checkpoint';
// In-memory (default when no store is specified)
const graph = new AgentGraph(...).compile();
What Gets Saved
Each checkpoint is a full, serialisable snapshot taken at a node boundary:
interface Checkpoint {
id: string; // UUIDv4 assigned by the runtime
graphId: string; // CompiledExecutionGraph id
runId: string; // Graph run id
nodeId: string; // The node at whose boundary this was captured
timestamp: number;
// GraphState partitions
state: {
input: unknown; // Original user input (frozen)
scratch: unknown; // Node-to-node communication bag
artifacts: unknown; // Accumulated outputs
diagnostics: DiagnosticsView;
};
// Optional: memory subsystem snapshot
memorySnapshot?: {
reads: Array<{ traceId: string; content: string; strength: number }>;
pendingWrites: Array<{ type: string; content: string; scope: string }>;
};
// Node results for non-idempotent replay
nodeResults: Record<string, {
effectClass: EffectClass;
output: unknown;
durationMs: number;
}>;
visitedNodes: string[]; // Nodes completed at checkpoint time
skippedNodes?: string[]; // Branches bypassed by routing decisions
pendingEdges: string[]; // Edges emitted but not yet executed
}
The memory partition is excluded from state — it is always rehydrated fresh from the memory store on resume (unless a memorySnapshot is present, which restores the exact in-flight state).
Checkpoint Policies
Control when checkpoints are persisted:
| Policy | Description |
|---|---|
every_node | Persist after every node completes. Maximum durability. Used by workflow() by default. |
explicit | Persist only for nodes with checkpoint: 'before', 'after', or 'both'. |
none | Never persist. Lowest overhead. Used by AgentGraph by default. |
// Graph-wide policy
const graph = new AgentGraph(state, { checkpointPolicy: 'every_node' }).compile();
// Per-node override
gmiNode(
{ instructions: '...' },
{ checkpoint: 'after' } // 'before' | 'after' | 'both' | 'none'
)
Resume Semantics
When a run is resumed from a checkpoint, the runtime replays or re-executes nodes depending on their effectClass:
| effectClass | Resume behavior | Rationale |
|---|---|---|
pure | Re-execute | Deterministic; safe to run again |
read | Re-execute | Idempotent; may return fresher data |
write | Replay recorded output from nodeResults | Not idempotent — would duplicate DB writes |
external | Replay recorded output from nodeResults | Not idempotent — would duplicate API calls |
human | Replay recorded output from nodeResults | Cannot ask a human the same question again |
This means you should always declare effectClass accurately on tool nodes:
// web_search makes external calls — declare it so resume replays the result
toolNode('web_search', {}, { effectClass: 'external' })
// A pure transform — safe to re-run
toolNode('json_formatter', {}, { effectClass: 'pure' })
// A database insert — mark as write so resume replays it
toolNode('create_record', {}, { effectClass: 'write' })
Resuming a Run
// With AgentGraph
const graph = new AgentGraph(...).compile({
checkpointStore: new InMemoryCheckpointStore(),
});
// Capture the latest checkpoint id during streaming
let lastCheckpointId: string | undefined;
for await (const event of graph.stream(input)) {
if (event.type === 'checkpoint_saved') {
lastCheckpointId = event.checkpointId; // present when a checkpoint was saved
}
}
// Resume after crash / human approval / timeout.
// You can pass either the run id or an exact checkpoint id.
const result = await graph.resume(lastCheckpointId!);
The same API applies to workflow() and mission():
const result = await workflow.resume(checkpointId);
const result = await missionCompiled.resume(checkpointId);
Time-Travel with fork()
fork() creates a new run branching from any past checkpoint, with optional state overrides. The original run is untouched.
const store = new InMemoryCheckpointStore();
// List checkpoints for a graph to find the right branch point
const checkpoints = await store.list('my-graph-id', { runId: 'run-abc' });
// checkpoints: CheckpointMetadata[], sorted by timestamp descending
// Fork from checkpoint with patched state
const newRunId = await store.fork(checkpoints[2].id, {
scratch: { confidence: 0.95 }, // override confidence so the loop exits
});
// Resume the forked run
const result = await graph.resume(newRunId);
The fork() operation:
- Deep-clones the source checkpoint
- Assigns a fresh
runIdand checkpointid - Applies
patchStateoverrides - Persists the new checkpoint
- Returns the new
runId
Common uses:
- Debug a failed run by patching the state that caused the failure
- Test alternative routing decisions from a shared starting point
- Replay a human-gated step with a different human response
Memory Consistency and Checkpointing
The MemoryConsistencyMode interacts with checkpointing:
| Mode | Memory snapshot saved? | On resume |
|---|---|---|
live | No | Memory is read fresh from the store |
snapshot | Yes (reads only) | Restores the in-flight reads; writes re-queued |
journaled | Yes (reads + pending writes) | Journal replayed atomically |
Set the mode per-node or graph-wide:
// Graph-wide
new AgentGraph(state, { memoryConsistency: 'snapshot' })
// Per-node via MemoryPolicy
gmiNode({ instructions: '...' }, {
memory: { consistency: 'journaled' },
})
Custom Backend
To use Postgres, Redis, or any other store, implement ICheckpointStore:
import type { ICheckpointStore, Checkpoint, CheckpointMetadata } from '@framers/agentos/orchestration/checkpoint';
import type { GraphState } from '@framers/agentos/orchestration';
class PostgresCheckpointStore implements ICheckpointStore {
constructor(private readonly pool: Pool) {}
async save(checkpoint: Checkpoint): Promise<void> {
await this.pool.query(
'INSERT INTO checkpoints (id, run_id, graph_id, node_id, timestamp, payload) VALUES ($1,$2,$3,$4,$5,$6) ON CONFLICT (id) DO UPDATE SET payload = $6',
[checkpoint.id, checkpoint.runId, checkpoint.graphId, checkpoint.nodeId, checkpoint.timestamp, JSON.stringify(checkpoint)]
);
}
async load(runId: string, nodeId?: string): Promise<Checkpoint | null> {
const query = nodeId
? 'SELECT payload FROM checkpoints WHERE run_id = $1 AND node_id = $2 ORDER BY timestamp DESC LIMIT 1'
: 'SELECT payload FROM checkpoints WHERE run_id = $1 ORDER BY timestamp DESC LIMIT 1';
const { rows } = await this.pool.query(query, nodeId ? [runId, nodeId] : [runId]);
return rows[0] ? JSON.parse(rows[0].payload) : null;
}
async latest(runId: string): Promise<Checkpoint | null> {
return this.load(runId);
}
async list(graphId: string, options?: { limit?: number; runId?: string }): Promise<CheckpointMetadata[]> {
// Return lightweight metadata, not full payloads
const { rows } = await this.pool.query(
'SELECT id, run_id, graph_id, node_id, timestamp, length(payload) as state_size FROM checkpoints WHERE graph_id = $1 ORDER BY timestamp DESC LIMIT $2',
[graphId, options?.limit ?? 100]
);
return rows.map(r => ({ id: r.id, runId: r.run_id, graphId: r.graph_id, nodeId: r.node_id, timestamp: r.timestamp, stateSize: r.state_size, hasMemorySnapshot: false }));
}
async delete(checkpointId: string): Promise<void> {
await this.pool.query('DELETE FROM checkpoints WHERE id = $1', [checkpointId]);
}
async fork(checkpointId: string, patchState?: Partial<GraphState>): Promise<string> {
const checkpoint = await this.load(checkpointId);
if (!checkpoint) throw new Error(`Checkpoint ${checkpointId} not found`);
const newCheckpoint: Checkpoint = {
...structuredClone(checkpoint),
id: crypto.randomUUID(),
runId: crypto.randomUUID(),
timestamp: Date.now(),
state: patchState ? { ...checkpoint.state, ...patchState } : checkpoint.state,
};
await this.save(newCheckpoint);
return newCheckpoint.runId;
}
}
Then pass it to any graph:
const graph = new AgentGraph(...).compile({
checkpointStore: new PostgresCheckpointStore(pool),
});
See Also
- AgentGraph — per-node checkpoint config, compile options
- workflow() DSL —
every_nodedefault policy - Unified Orchestration — architecture overview