Skip to main content

workflow() DSL

The right way to ship a multi-step agent is rarely the most flexible way. A graph that can loop and replan is great when you don't know what shape the work takes; it's overkill — and harder to operate — when you do know. workflow() exists for the second case. You declare the steps in the order they run, the compiler builds the execution graph, and a static cycle check rejects anything that would loop. The output is the same CompiledExecutionGraph the cyclic builders produce, so you can swap a workflow() for an AgentGraph later without re-architecting the runtime around it.

Use workflow() when the steps are known and ordered. Use AgentGraph when you need cycles, conditional branches, or fan-out/fan-in patterns the linear then() chain can't express. Use mission() when you want to declare intent first and let the planner decide the steps.

Quick Start

import { workflow } from '@framers/agentos/orchestration';
import { z } from 'zod';

const wf = workflow('summarize-and-tag')
.input(z.object({ url: z.string() }))
.returns(z.object({ summary: z.string(), tags: z.array(z.string()) }))
.step('fetch', { tool: 'web_fetch', effectClass: 'external' })
.step('summarize', { gmi: { instructions: 'Summarize the document in 3 sentences.' } })
.step('tag', { gmi: { instructions: 'Extract 5 topic tags.' } })
.compile();

const result = await wf.invoke({ url: 'https://example.com/article' });

Factory Function

workflow(name: string): WorkflowBuilder

Returns a new WorkflowBuilder. The name is used as the compiled graph's display name and as a prefix for run ids and checkpoint keys.

Schema Declaration

Both .input() and .returns() are required. Compilation throws if either is missing.

workflow('my-pipeline')
.input(z.object({ query: z.string(), limit: z.number().default(10) }))
.returns(z.object({ results: z.array(z.string()), count: z.number() }))

Both accept Zod schemas or plain JSON Schema objects.

Step Primitives

step() / then()

Appends a single named step. then() is an alias for step() — it reads more naturally when chaining.

wf.step('fetch', { tool: 'web_search' })
.then('extract', { gmi: { instructions: 'Extract the key facts.' } })
.then('approve', { human: { prompt: 'Approve these facts?' } })

StepConfig — execution strategies (pick one):

FieldDescription
toolName of a registered ITool to invoke
gmiLLM call with instructions string. Always runs as single_turn
humanSuspend and surface prompt to a human operator
extensionCall method on registered extensionId
subgraphDelegate to a CompiledExecutionGraph

StepConfig — optional policies:

{
tool: 'web_search',

// Side-effect classification
effectClass: 'external', // 'pure' | 'read' | 'write' | 'external' | 'human'

// Memory
memory: {
consistency: 'snapshot',
read: { types: ['episodic'], maxTraces: 5 },
write: { autoEncode: true, type: 'episodic', scope: 'session' },
},

// Guardrails
guardrails: {
output: ['pii-redaction'],
onViolation: 'sanitize',
},

// Failure handling
// Note: `retryPolicy` is part of the compiled IR today, but automatic retry
// execution is still being wired into the shared runtime.
onFailure: 'retry',
retryPolicy: { maxAttempts: 3, backoff: 'exponential', backoffMs: 500 },

// Timeout
timeout: 30_000,

// Human approval gate before proceeding
requiresApproval: true,
}

branch()

Appends a conditional fan-out. The condition function evaluates GraphState at runtime and returns a route key. Each route key maps to a step config. All branches become the new tail — the next declared step connects from all of them.

wf.step('classify', { tool: 'classifier' })
.branch(
(state) => state.scratch.category, // returns a route key
{
premium: { gmi: { instructions: 'Generate premium-tier response.' } },
standard: { gmi: { instructions: 'Generate standard response.' } },
rejected: { tool: 'log_rejection' },
}
)
.step('send', { tool: 'send_email' }) // connects from all three branches

The condition function must return one of the route keys. Returning an unrecognised key causes a BRANCH_ROUTE_NOT_FOUND runtime error.

parallel()

Appends a concurrent fan-out. All steps execute simultaneously (subject to runtime scheduling). After all complete, their outputs are merged using the join.merge reducers.

wf.step('fetch', { tool: 'web_fetch' })
.parallel(
[
{ gmi: { instructions: 'Summarize in English.' } },
{ gmi: { instructions: 'Summarize in French.' } },
{ gmi: { instructions: 'Summarize in German.' } },
],
{
strategy: 'all', // 'all' | 'any' | 'quorum'
quorumCount: 2, // only used when strategy is 'quorum'
merge: {
'scratch.summaries': 'concat', // arrays get concatenated
},
timeout: 60_000,
}
)
.step('combine', { gmi: { instructions: 'Pick the best summary.' } })

Join strategies:

StrategyDescription
allWait for all parallel branches to complete
anyContinue as soon as the first branch completes
quorumWait for quorumCount branches to complete

DAG Enforcement

workflow() enforces acyclicity at compile time. Any attempt to create a cycle (even via subgraphs) throws a WorkflowValidationError:

Workflow validation failed: cycle detected involving nodes: fetch → process → fetch

If you need agent loops, use AgentGraph instead.

GMI steps inside workflow() always run in single_turn mode — the executionMode field in gmi config is accepted in the type but ignored at runtime. This is a deliberate design choice: workflow steps must be cost-bounded and deterministic.

Compilation

const compiled = wf.compile({
checkpointStore: new InMemoryCheckpointStore('./runs.db'), // optional
});

The workflow compiler automatically sets checkpointPolicy: 'every_node' — every step is checkpointed by default, making long-running workflows resumable after any failure.

Execution

// Run to completion
const result = await compiled.invoke({ url: 'https://example.com' });

// Stream events
for await (const event of compiled.stream({ url: 'https://example.com' })) {
console.log(event.type, event.nodeId);
}

// Resume after failure (checkpointId from GraphState.checkpointId)
const result = await compiled.resume(checkpointId);

// Export IR for subgraph embedding
const ir = compiled.toIR();

Complete Example — Onboarding Workflow

import { workflow } from '@framers/agentos/orchestration';
import { InMemoryCheckpointStore } from '@framers/agentos/orchestration/checkpoint';
import { z } from 'zod';

const onboarding = workflow('user-onboarding')
.input(z.object({
userId: z.string(),
plan: z.enum(['free', 'pro', 'enterprise']),
}))
.returns(z.object({
welcomed: z.boolean(),
resourcesCreated: z.array(z.string()),
}))

// Step 1: fetch the new user profile
.step('fetch-user', {
tool: 'get_user',
effectClass: 'read',
})

// Step 2: branch on plan type
.branch(
(state) => state.scratch.plan,
{
free: {
tool: 'provision_free_tier',
effectClass: 'write',
},
pro: {
tool: 'provision_pro_tier',
effectClass: 'write',
// Declared now, runtime-managed retries still pending.
retryPolicy: { maxAttempts: 3, backoff: 'exponential', backoffMs: 1000 },
},
enterprise: {
human: { prompt: 'Manually configure enterprise account for this user.' },
requiresApproval: true,
},
}
)

// Step 3: parallel welcome actions
.parallel(
[
{ tool: 'send_welcome_email', effectClass: 'external' },
{ tool: 'create_default_workspace', effectClass: 'write' },
{ gmi: { instructions: 'Generate a personalised getting-started checklist.' } },
],
{
strategy: 'all',
merge: {
'scratch.provisioned': 'concat',
},
}
)

// Step 4: confirm
.step('confirm', {
gmi: {
instructions: 'Confirm onboarding is complete and summarise what was created.',
maxTokens: 256,
},
})

.compile({
checkpointStore: new InMemoryCheckpointStore('./onboarding.db'),
});

// Run
const result = await onboarding.invoke({ userId: 'u_123', plan: 'pro' });

// Stream with progress UI
for await (const event of onboarding.stream({ userId: 'u_456', plan: 'free' })) {
if (event.type === 'node_start') updateProgressBar(event.nodeId);
}

// Resume a human-gated enterprise onboarding after approval
const result2 = await onboarding.resume(savedCheckpointId);

See Also


References

DAG workflow engines

  • Apache Airflow contributors. Apache Airflow: Programmatically author, schedule and monitor workflows. — Reference DAG-execution semantics that informed workflow()'s topological-sort + tier-execution model. airflow.apache.org
  • Prefect contributors. Prefect: The new standard in dataflow automation. — Modern Python workflow engine with similar fail-fast and resume semantics. prefect.io
  • Temporal contributors. Temporal: Microservices orchestration platform. — Durable-execution patterns informing the checkpointing + resume design shared with mission() and AgentGraph. temporal.io

LLM-pipeline composition

  • Khattab, O., Singhvi, A., Maheshwari, P., Zhang, Z., Santhanam, K., Vardhamanan, S., Haq, S., Sharma, A., Joshi, T., Moazam, H., Miller, H., Zaharia, M., & Potts, C. (2023). DSPy: Compiling declarative language model calls into self-improving pipelines. arXiv preprint. — The "compile-then-run" approach to LLM pipelines that informed the CompiledExecutionGraph IR design. arXiv:2310.03714

Implementation references

  • packages/agentos/src/orchestration/builders/WorkflowBuilder.tsworkflow() factory + chain builder
  • packages/agentos/src/orchestration/compiler/CompiledExecutionGraph.ts — shared IR