Creating Custom Guardrails
This guide walks you through everything you need to create, package, test, and deploy a custom guardrail for AgentOS. By the end you will understand the full lifecycle -- from implementing the IGuardrailService interface to publishing a self-contained extension pack.
Overview
Guardrails intercept content at two points in the AgentOS pipeline:
- Input -- before user messages enter the orchestration pipeline.
- Output -- before agent responses are streamed to the client.
User Input --> [Input Guardrails] --> Orchestration --> [Output Guardrails] --> Client
When to create a custom guardrail
AgentOS ships with five first-class guardrail packs (PII Redaction, ML Classifiers, Topicality, Code Safety, Grounding Guard). Create a custom guardrail when:
- You need domain-specific policy enforcement (e.g., financial compliance, medical disclaimers).
- You want to enforce custom word lists, regex patterns, or proprietary classifiers.
- You need to integrate with external moderation APIs (perspective API, custom ML endpoints).
- You want cost or length ceilings tailored to your product.
If an existing pack already covers your need, prefer configuring it over writing a new one.
The contract at a glance
Every guardrail implements the IGuardrailService interface. Both methods are optional -- implement only what you need.
interface IGuardrailService {
// Optional configuration controlling streaming behavior and execution phase.
config?: GuardrailConfig;
// Evaluate user input BEFORE it enters the orchestration pipeline.
// Return null to allow, or a GuardrailEvaluationResult to act on it.
evaluateInput?(payload: GuardrailInputPayload): Promise<GuardrailEvaluationResult | null>;
// Evaluate agent output BEFORE it is streamed to the client.
// Return null to allow, or a GuardrailEvaluationResult to act on it.
evaluateOutput?(payload: GuardrailOutputPayload): Promise<GuardrailEvaluationResult | null>;
}
The IGuardrailService Interface
GuardrailConfig
The optional config object controls when and how your guardrail is called.
interface GuardrailConfig {
// When true, evaluateOutput is called for every TEXT_DELTA chunk during
// streaming. When false (default), it is only called for FINAL_RESPONSE
// chunks. Enable this for real-time PII redaction or immediate blocking.
evaluateStreamingChunks?: boolean; // default: false
// Rate-limits how many streaming evaluations happen per request.
// After this limit, remaining chunks pass through unevaluated.
// Only applies when evaluateStreamingChunks is true.
maxStreamingEvaluations?: number; // default: undefined (no limit)
// When true, this guardrail runs in Phase 1 (sequential).
// It sees and can modify text produced by prior sanitizers.
// When false, it runs in Phase 2 (parallel).
// Phase 2 guardrails that return SANITIZE are downgraded to FLAG.
canSanitize?: boolean; // default: false
// Maximum time (ms) to wait for this guardrail's evaluation.
// On timeout the evaluation is abandoned (fail-open) and a warning is logged.
//
// SAFETY WARNING: Do NOT set timeoutMs on safety-critical guardrails
// (e.g., CSAM detection, compliance-mandatory filters) because fail-open
// on timeout means content passes unchecked.
timeoutMs?: number; // default: undefined (wait indefinitely)
}
evaluateInput(payload)
Called once per request before orchestration begins.
interface GuardrailInputPayload {
// Conversational context for policy decisions.
context: GuardrailContext;
// The user's input. The most commonly inspected field is input.textInput.
input: AgentOSInput;
}
interface GuardrailContext {
userId: string; // Unique user identifier
sessionId: string; // Current session
personaId?: string; // Active persona/agent identity
conversationId?: string; // Conversation thread
mode?: string; // Operating mode (e.g., 'debug', 'production')
metadata?: Record<string, unknown>; // Arbitrary context for your policy logic
}
evaluateOutput(payload)
Called for response chunks. The timing depends on config.evaluateStreamingChunks:
false(default) -- called only for FINAL_RESPONSE chunks.true-- called for every TEXT_DELTA chunk during streaming.
interface GuardrailOutputPayload {
// Same conversational context as evaluateInput.
context: GuardrailContext;
// The response chunk to evaluate. This is a union type --
// see "Understanding the Chunk Lifecycle" below.
chunk: AgentOSResponse;
// RAG source chunks retrieved for this request.
// Available for grounding verification (e.g., hallucination detection).
// Undefined when no RAG retrieval was performed.
ragSources?: RagRetrievedChunk[];
}
Return values
Return null to allow content through without action. Otherwise, return a GuardrailEvaluationResult:
interface GuardrailEvaluationResult {
// The action AgentOS should take.
action: GuardrailAction;
// Human-readable reason (may be shown to users or logged).
reason?: string;
// Machine-readable code for analytics and automated handling.
reasonCode?: string;
// Arbitrary metadata persisted in response chunk metadata.
metadata?: Record<string, unknown>;
// Detailed debugging info (not shown to users).
details?: unknown;
// Replacement text when action is SANITIZE.
// For input: replaces textInput before orchestration.
// For output: replaces textDelta (streaming) or finalResponseText (final).
modifiedText?: string | null;
}
GuardrailAction Deep Dive
| Action | Enum Value | Effect |
|---|---|---|
| ALLOW | 'allow' | Pass content unchanged. Use when all checks pass. |
| FLAG | 'flag' | Pass content through but record metadata for audit/analytics. Content reaches the user; the evaluation is logged for review. |
| SANITIZE | 'sanitize' | Replace content with the value in modifiedText. Use for PII redaction, profanity masking, or content rewriting. Requires canSanitize: true in config to work in Phase 1. |
| BLOCK | 'block' | Reject/terminate the interaction. For input evaluation, the request is never processed. For output evaluation during streaming, the stream is terminated immediately with an error chunk. |
SANITIZE rules
- If
canSanitize: trueis set in your config, your guardrail runs in Phase 1 (sequential) and yourmodifiedTextis applied before downstream guardrails see the content. - If your guardrail is in Phase 2 (no
canSanitizeorcanSanitize: false) and it returns SANITIZE, the action is automatically downgraded to FLAG with a warning logged. This prevents non-deterministic concurrent modifications.
Understanding the Chunk Lifecycle
When evaluateOutput is called, the chunk field is a union of all AgentOS response types. Your guardrail will typically care about a subset of these.
TEXT_DELTA
Streaming text chunks emitted as the LLM generates tokens.
// TEXT_DELTA chunks are only sent to your guardrail if
// config.evaluateStreamingChunks is true.
{
type: 'text_delta', // AgentOSResponseChunkType.TEXT_DELTA
streamId: 'stream-abc', // Unique stream identifier
textDelta: 'Hello, ', // The incremental text content
isFinal: false, // Always false for mid-stream deltas
// ... other base fields (gmiInstanceId, personaId, timestamp, metadata)
}
FINAL_RESPONSE
The complete, assembled response emitted at the end of a turn.
// FINAL_RESPONSE is always sent to your guardrail (regardless of
// evaluateStreamingChunks setting).
{
type: 'final_response', // AgentOSResponseChunkType.FINAL_RESPONSE
streamId: 'stream-abc',
finalResponseText: 'Hello, world! How can I help you today?',
isFinal: true, // Always true for final responses
ragSources: [...], // RAG chunks (if retrieval was performed)
usage: { totalTokens: 42 }, // Cost aggregator
// ... other fields (audioOutput, imageOutput, reasoningTrace, etc.)
}
TOOL_CALL_REQUEST
Emitted when the LLM requests a tool call. Useful for guardrails that want to approve or deny tool invocations.
{
type: 'tool_call_request', // AgentOSResponseChunkType.TOOL_CALL_REQUEST
streamId: 'stream-abc',
toolCalls: [
{
id: 'call_001',
name: 'web_search',
arguments: '{"query": "latest news"}'
}
],
rationale: 'User asked about current events.',
isFinal: false,
}
TOOL_RESULT_EMISSION
The result of a tool execution, emitted after the tool runs.
{
type: 'tool_result_emission', // AgentOSResponseChunkType.TOOL_RESULT_EMISSION
streamId: 'stream-abc',
toolCallId: 'call_001',
toolName: 'web_search',
toolResult: { ... },
isSuccess: true,
isFinal: false,
}
Other chunk types
| Type | Typical guardrail action |
|---|---|
SYSTEM_PROGRESS | Usually ignored. Informational progress updates. |
UI_COMMAND | Rarely intercepted. Contains frontend rendering instructions. |
ERROR | Usually ignored. The stream is already in an error state. |
METADATA_UPDATE | Usually ignored. Session metadata changes. |
WORKFLOW_UPDATE | Usually ignored. Multi-step workflow progress. |
AGENCY_UPDATE | Usually ignored. Multi-agent coordination state. |
PROVENANCE_EVENT | Usually ignored. Signed ledger entries. |
The isFinal flag
Every chunk has an isFinal: boolean field.
isFinal: false-- more chunks will follow in this stream.isFinal: true-- this is the last chunk. ForFINAL_RESPONSEit is always true. ForTEXT_DELTAthe last streaming delta before the final response also hasisFinal: true.
If your guardrail buffers streaming text, use isFinal: true as the signal to flush your buffer and perform a final evaluation.
Two-Phase Parallel Execution
When multiple guardrails are registered, the ParallelGuardrailDispatcher runs them in two phases:
Registered Guardrails
|
+-----------+-----------+
| |
canSanitize: true canSanitize: false/omitted
| |
Phase 1 Phase 2
(Sequential) (Parallel)
| |
Each sees prior's Promise.allSettled
modified text on Phase 1 output
| |
BLOCK short-circuits SANITIZE --> FLAG
the entire pipeline (downgraded)
| |
+-----------+-----------+
|
Worst-Wins Aggregation
BLOCK (3) > FLAG (2) > ALLOW (0)
Phase 1: Sequential sanitizers
Guardrails with canSanitize: true run one at a time in registration order. Each sanitizer receives the cumulative output of all preceding sanitizers:
- PII Redaction sanitizer receives the original text.
- Profanity filter sanitizer receives PII-redacted text.
- Custom rewriter sanitizer receives PII-redacted + profanity-filtered text.
A BLOCK in Phase 1 short-circuits immediately -- Phase 2 never runs.
Phase 2: Parallel classifiers
All remaining guardrails (those without canSanitize or with canSanitize: false) run concurrently via Promise.allSettled on the fully-sanitized text from Phase 1.
If a Phase 2 guardrail returns SANITIZE, the action is downgraded to FLAG with a warning logged, because concurrent sanitization would produce non-deterministic results.
Worst-wins aggregation
After both phases complete, the dispatcher picks the single worst action:
- BLOCK (severity 3) takes priority over everything.
- FLAG (severity 2) takes priority over ALLOW.
- ALLOW (severity 0) is the default when no guardrail triggers.
Priority and stacking
The priority field on an ExtensionDescriptor controls stacking (same id descriptors), not execution order. A higher-priority descriptor with the same id supersedes a lower-priority one. Registration order determines execution order within Phase 1 and Phase 2.
Step-by-Step: Creating a Simple Guardrail
Let's create a profanity filter that blocks prohibited words in both input and output.
import type {
IGuardrailService,
GuardrailInputPayload,
GuardrailOutputPayload,
GuardrailEvaluationResult,
} from '@framers/agentos';
import { GuardrailAction, AgentOSResponseChunkType } from '@framers/agentos';
/**
* A simple profanity filter that blocks messages containing prohibited words.
*
* This guardrail operates in final-only mode (no streaming evaluation) and
* does not modify content -- it either allows or blocks. It runs in Phase 2
* (parallel) because canSanitize is not set.
*/
class ProfanityFilterGuardrail implements IGuardrailService {
// No special config needed -- defaults to final-only, Phase 2 (parallel).
// Omitting config entirely is equivalent to:
// config = { evaluateStreamingChunks: false, canSanitize: false };
/**
* The list of prohibited words. In production you would load this from
* a database, config file, or external API.
*/
private readonly blocklist: string[] = [
'badword1',
'badword2',
'offensive_term',
];
/**
* Check if text contains any prohibited words.
* Uses case-insensitive word-boundary matching to avoid false positives
* on substrings (e.g., "classic" should not match "ass").
*/
private containsProfanity(text: string): string | null {
// Normalize to lowercase for case-insensitive matching.
const lower = text.toLowerCase();
for (const word of this.blocklist) {
// Build a regex with word boundaries so "classic" doesn't match "ass".
const regex = new RegExp(`\\b${word}\\b`, 'i');
if (regex.test(lower)) {
return word; // Return the first matched word for the reason message.
}
}
return null; // No profanity found.
}
/**
* Evaluate user input before orchestration.
* Blocks the request entirely if profanity is detected.
*/
async evaluateInput(
{ input }: GuardrailInputPayload,
): Promise<GuardrailEvaluationResult | null> {
// Guard: skip if there is no text to evaluate.
if (!input.textInput) {
return null;
}
const matched = this.containsProfanity(input.textInput);
if (matched) {
return {
action: GuardrailAction.BLOCK,
reason: 'Your message contains language that violates our usage policy.',
reasonCode: 'PROFANITY_INPUT_BLOCKED',
metadata: {
// Include the matched word for audit logs (not shown to user).
matchedTerm: matched,
},
};
}
// No issues found -- allow the input through.
return null;
}
/**
* Evaluate agent output before it reaches the client.
* Blocks the response if profanity is detected in the final text.
*
* Because evaluateStreamingChunks is false (default), this method is
* only called for FINAL_RESPONSE chunks.
*/
async evaluateOutput(
{ chunk }: GuardrailOutputPayload,
): Promise<GuardrailEvaluationResult | null> {
// Only evaluate the final assembled response.
if (chunk.type !== AgentOSResponseChunkType.FINAL_RESPONSE) {
return null;
}
const text = chunk.finalResponseText;
// Guard: skip if the final response is empty.
if (!text) {
return null;
}
const matched = this.containsProfanity(text);
if (matched) {
return {
action: GuardrailAction.BLOCK,
reason: 'The response was blocked due to a content policy violation.',
reasonCode: 'PROFANITY_OUTPUT_BLOCKED',
metadata: { matchedTerm: matched },
};
}
return null;
}
}
Step-by-Step: Creating a Streaming Guardrail
Streaming guardrails evaluate TEXT_DELTA chunks in real time. The main challenge is that a single text delta may contain only a fragment of a pattern you want to detect. You need to buffer the streaming text and evaluate at sentence boundaries or on flush.
import type {
IGuardrailService,
GuardrailConfig,
GuardrailOutputPayload,
GuardrailEvaluationResult,
} from '@framers/agentos';
import { GuardrailAction, AgentOSResponseChunkType } from '@framers/agentos';
/**
* Per-stream buffer state for accumulating text deltas.
* Each concurrent stream gets its own buffer so guardrail state
* does not leak between parallel conversations.
*/
interface StreamBuffer {
/** Accumulated text that has not yet been evaluated. */
text: string;
/** Number of evaluations performed for this stream (for rate limiting). */
evaluations: number;
/** Epoch ms of last chunk received (for stale cleanup). */
lastSeenAt: number;
}
/**
* Streaming content policy guardrail that accumulates text and
* evaluates at sentence boundaries.
*
* When a prohibited pattern spans multiple TEXT_DELTA chunks, buffering
* ensures we still detect it. On isFinal we flush the remaining buffer.
*
* This guardrail does NOT set canSanitize, so it runs in Phase 2 (parallel).
* If you need to modify text mid-stream, set canSanitize: true.
*/
class StreamingContentPolicyGuardrail implements IGuardrailService {
/**
* Enable streaming evaluation so we see every TEXT_DELTA chunk.
* Rate-limit to 100 evaluations per stream to control cost.
*/
config: GuardrailConfig = {
evaluateStreamingChunks: true,
maxStreamingEvaluations: 100,
};
/**
* Per-stream buffer map. Keyed by streamId so concurrent streams
* each get isolated state.
*/
private buffers = new Map<string, StreamBuffer>();
/**
* Patterns we want to block. These may span multiple chunks,
* which is why we buffer.
*/
private readonly prohibitedPatterns = [
/instructions\s+for\s+making\s+a\s+bomb/i,
/how\s+to\s+hack\s+into/i,
/bypass\s+security\s+system/i,
];
/** Sentence-boundary regex for flushing the buffer. */
private readonly sentenceBoundary = /[.!?]\s|\n/;
/** Maximum age (ms) before a stale stream buffer is cleaned up. */
private readonly staleThresholdMs = 60_000; // 1 minute
/**
* Evaluate output chunks in real time.
*
* For TEXT_DELTA chunks: accumulate into a per-stream buffer and
* check for prohibited patterns at sentence boundaries.
*
* For isFinal chunks: flush the remaining buffer and do a final check.
*/
async evaluateOutput(
{ chunk }: GuardrailOutputPayload,
): Promise<GuardrailEvaluationResult | null> {
// We only care about TEXT_DELTA and FINAL_RESPONSE chunks.
if (
chunk.type !== AgentOSResponseChunkType.TEXT_DELTA &&
chunk.type !== AgentOSResponseChunkType.FINAL_RESPONSE
) {
return null;
}
const streamId = chunk.streamId;
// ---------------------------------------------------------------
// isFinal: flush the buffer and clean up the stream state.
// ---------------------------------------------------------------
if (chunk.isFinal) {
const buffer = this.buffers.get(streamId);
// Determine the text to evaluate on the final flush.
const finalText = chunk.type === AgentOSResponseChunkType.FINAL_RESPONSE
? chunk.finalResponseText ?? ''
: (buffer?.text ?? '') + ((chunk as any).textDelta ?? '');
// Clean up the stream buffer -- we are done with this stream.
this.buffers.delete(streamId);
// Periodically clean up stale buffers from abandoned streams.
this.cleanupStaleBuffers();
return this.checkPatterns(finalText);
}
// ---------------------------------------------------------------
// TEXT_DELTA (non-final): accumulate into the stream buffer.
// ---------------------------------------------------------------
if (chunk.type === AgentOSResponseChunkType.TEXT_DELTA && chunk.textDelta) {
// Get or create the buffer for this stream.
let buffer = this.buffers.get(streamId);
if (!buffer) {
buffer = { text: '', evaluations: 0, lastSeenAt: Date.now() };
this.buffers.set(streamId, buffer);
}
// Append the new delta to the buffer.
buffer.text += chunk.textDelta;
buffer.lastSeenAt = Date.now();
// Only evaluate at sentence boundaries to reduce overhead.
// If no sentence boundary is found yet, wait for more text.
if (this.sentenceBoundary.test(buffer.text)) {
buffer.evaluations++;
// Evaluate the entire accumulated buffer.
const result = this.checkPatterns(buffer.text);
if (result) {
// Prohibited content found -- clean up and block.
this.buffers.delete(streamId);
return result;
}
// Trim already-evaluated text up to the last sentence boundary.
// Keep the trailing fragment for the next evaluation.
const lastBoundary = Math.max(
buffer.text.lastIndexOf('. '),
buffer.text.lastIndexOf('? '),
buffer.text.lastIndexOf('! '),
buffer.text.lastIndexOf('\n'),
);
if (lastBoundary > 0) {
buffer.text = buffer.text.slice(lastBoundary + 1);
}
}
}
// No issues detected in this chunk.
return null;
}
/**
* Check accumulated text against all prohibited patterns.
* Returns a BLOCK result if a match is found, or null otherwise.
*/
private checkPatterns(text: string): GuardrailEvaluationResult | null {
for (const pattern of this.prohibitedPatterns) {
if (pattern.test(text)) {
return {
action: GuardrailAction.BLOCK,
reason: 'Response contains content that violates our usage policy.',
reasonCode: 'STREAMING_CONTENT_POLICY_VIOLATION',
metadata: {
pattern: pattern.source,
detectedAt: new Date().toISOString(),
},
};
}
}
return null;
}
/**
* Remove buffers for streams that have not received a chunk in over
* staleThresholdMs. This prevents memory leaks from abandoned streams
* (e.g., network disconnects or client-side cancellations).
*/
private cleanupStaleBuffers(): void {
const now = Date.now();
for (const [id, buf] of this.buffers) {
if (now - buf.lastSeenAt > this.staleThresholdMs) {
this.buffers.delete(id);
}
}
}
}
Key patterns in streaming guardrails
- Buffer per stream -- use
Map<streamId, StreamBuffer>so concurrent streams do not interfere with each other. - Evaluate at sentence boundaries -- reduces evaluation overhead without missing multi-chunk patterns.
- Flush on
isFinal-- always evaluate the remaining buffer when the stream ends. - Stale cleanup -- delete buffers for streams that stopped sending chunks to avoid memory leaks.
- Rate limiting -- set
maxStreamingEvaluationsto cap how many evaluations a single stream triggers.
Packaging as an Extension Pack
To distribute your guardrail as a reusable extension, package it as an ExtensionPack under the curated registry.
1. Directory structure
Create a new directory under the extensions registry:
packages/agentos-extensions/registry/curated/safety/my-guardrail/
manifest.json
package.json
tsconfig.json
vitest.config.ts
src/
index.ts # createExtensionPack() factory
MyGuardrail.ts # IGuardrailService implementation
types.ts # Pack-specific option types
test/
MyGuardrail.spec.ts # Unit tests
2. package.json
Declare @framers/agentos as a peer dependency so your pack works with any compatible AgentOS version.
{
"name": "@framers/agentos-ext-my-guardrail",
"version": "0.1.0",
"description": "My custom guardrail for AgentOS",
"type": "module",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
"exports": {
".": {
"import": "./dist/index.js",
"types": "./dist/index.d.ts"
}
},
"files": ["dist", "src", "manifest.json"],
"scripts": {
"build": "tsc -p tsconfig.json",
"test": "vitest run"
},
"peerDependencies": {
"@framers/agentos": "^0.1.0"
},
"devDependencies": {
"@framers/agentos": "workspace:*",
"typescript": "^5.5.0",
"vitest": "^1.6.0"
},
"license": "MIT"
}
3. tsconfig.json
Use paths to map @framers/agentos to the monorepo source during development.
{
"compilerOptions": {
"target": "ES2022",
"module": "ESNext",
"moduleResolution": "bundler",
"declaration": true,
"declarationMap": true,
"sourceMap": true,
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true,
"isolatedModules": true,
"paths": {
"@framers/agentos": ["../../../../../agentos/dist/index.d.ts"],
"@framers/agentos/*": ["../../../../../agentos/dist/*"]
}
},
"include": ["src/**/*.ts"],
"exclude": ["node_modules", "dist", "test"]
}
4. vitest.config.ts
Resolve the @framers/agentos alias for both CI and monorepo layouts.
import { defineConfig } from 'vitest/config';
import path from 'path';
import fs from 'fs';
// CI layout: agentos cloned into packages/agentos/ inside this repo.
const ciPath = path.resolve(__dirname, '../../../../packages/agentos/src');
// Monorepo layout: agentos is a sibling at packages/agentos/.
const monoPath = path.resolve(__dirname, '../../../../../agentos/src');
// Use whichever path actually exists on disk.
const agentosPath = fs.existsSync(ciPath)
? ciPath
: fs.existsSync(monoPath) ? monoPath : null;
export default defineConfig({
test: {
globals: true,
environment: 'node',
include: ['test/**/*.spec.ts'],
testTimeout: 10000,
},
resolve: agentosPath ? {
alias: {
'@framers/agentos': agentosPath,
},
} : {},
});
5. manifest.json
Declare the pack metadata for the extension registry.
{
"name": "@framers/agentos-ext-my-guardrail",
"version": "0.1.0",
"description": "My custom guardrail for AgentOS",
"author": "Your Name",
"license": "MIT",
"category": "safety",
"tags": ["guardrail", "content-filter", "safety"],
"entryPoint": "./dist/index.js"
}
6. src/index.ts -- the createExtensionPack() factory
This is the main entry point. It follows the same pattern as the built-in PII Redaction pack.
import type { ISharedServiceRegistry } from '@framers/agentos';
import { SharedServiceRegistry } from '@framers/agentos';
import type {
ExtensionPack,
ExtensionPackContext,
ExtensionDescriptor,
ExtensionLifecycleContext,
} from '@framers/agentos';
// EXTENSION_KIND_GUARDRAIL is the well-known kind string for guardrail descriptors.
import { EXTENSION_KIND_GUARDRAIL } from '@framers/agentos';
import type { MyGuardrailOptions } from './types';
import { MyGuardrail } from './MyGuardrail';
// Re-export types for consumers.
export * from './types';
/**
* Create an ExtensionPack that bundles the custom guardrail.
*
* @param options - Optional pack configuration. All fields have defaults.
* @returns A fully-configured ExtensionPack ready for registration.
*/
export function createMyGuardrailPack(
options?: MyGuardrailOptions,
): ExtensionPack {
const opts: MyGuardrailOptions = options ?? {};
// Mutable state that onActivate can upgrade with the extension manager's
// shared registry and secret resolver.
const state = {
// Start with a standalone registry; onActivate replaces it with the
// agent-wide shared registry so heavyweight resources are shared.
services: new SharedServiceRegistry() as ISharedServiceRegistry,
getSecret: undefined as ((id: string) => string | undefined) | undefined,
};
// Build the guardrail instance. This function is called once at creation
// time and again during onActivate to upgrade to shared resources.
let guardrail: MyGuardrail;
function buildComponents(): void {
guardrail = new MyGuardrail(state.services, opts, state.getSecret);
}
// Initial build for direct programmatic use (before activation).
buildComponents();
return {
// Canonical pack name used in manifests and logs.
name: 'my-guardrail',
version: '1.0.0',
// Descriptors getter returns the current guardrail instance wrapped
// in the ExtensionDescriptor shape. Uses a getter so descriptors
// always reflect the latest (potentially rebuilt) component.
get descriptors(): ExtensionDescriptor[] {
return [
{
id: 'my-guardrail',
kind: EXTENSION_KIND_GUARDRAIL, // 'guardrail'
priority: 0,
payload: guardrail,
},
];
},
/**
* Lifecycle hook called when the extension manager activates this pack.
* Upgrades to the agent-wide shared service registry so heavyweight
* resources (ML models, NLP libraries) are shared across extensions.
*/
onActivate: (context: ExtensionLifecycleContext): void => {
if (context.services) {
state.services = context.services;
}
if (context.getSecret) {
state.getSecret = context.getSecret;
}
// Rebuild with the upgraded shared registry.
buildComponents();
},
/**
* Lifecycle hook called when the pack is deactivated.
* Release any resources allocated by the guardrail.
*/
onDeactivate: async (): Promise<void> => {
// Release services registered by this pack. If other packs share
// the same service IDs, they will be unaffected (reference-counted).
await state.services.releaseAll();
},
};
}
/**
* Manifest factory bridge. Conforms to the convention expected by the
* extension loader when resolving packs from manifests.
*/
export function createExtensionPack(context: ExtensionPackContext): ExtensionPack {
return createMyGuardrailPack(context.options as MyGuardrailOptions);
}
7. Register in pnpm-workspace.yaml and registry.json
Add your new package to pnpm-workspace.yaml:
packages:
- 'packages/*'
- 'packages/agentos-extensions/registry/curated/safety/my-guardrail'
And add an entry to packages/agentos-extensions/registry.json:
{
"my-guardrail": {
"path": "registry/curated/safety/my-guardrail",
"category": "safety",
"description": "My custom guardrail"
}
}
Using ISharedServiceRegistry
The ISharedServiceRegistry is a singleton registry for lazy-loading heavyweight resources like ML models, NLP libraries, or large data files. It ensures that if multiple extensions need the same resource, only one instance is created.
import type { ISharedServiceRegistry } from '@framers/agentos';
class MyGuardrail implements IGuardrailService {
constructor(
private readonly services: ISharedServiceRegistry,
private readonly options: MyGuardrailOptions,
) {}
async evaluateInput({ input }: GuardrailInputPayload) {
// getOrCreate returns the cached instance if it already exists,
// or calls the factory function once to create it.
// The service ID is a unique string -- use the same ID across
// extensions to share the same instance.
const classifier = await this.services.getOrCreate(
'toxicity-classifier-v2', // Service ID (shared across extensions)
async () => {
// This factory runs ONCE, the first time any extension requests
// this service ID. Subsequent calls return the cached instance.
const { ToxicityClassifier } = await import('my-ml-library');
const model = new ToxicityClassifier();
await model.loadWeights(); // ~98MB one-time load
return model;
},
{
// dispose is called when services.release('toxicity-classifier-v2')
// or services.releaseAll() is invoked. Clean up GPU memory, file
// handles, etc.
dispose: async (instance) => {
await (instance as any).unload();
},
// Tags are for diagnostics/tooling -- optional but recommended.
tags: ['ml', 'toxicity', 'gpu'],
},
);
// Use the shared classifier instance.
const score = await classifier.classify(input.textInput ?? '');
if (score > 0.9) {
return {
action: GuardrailAction.BLOCK,
reason: 'Input classified as toxic.',
reasonCode: 'TOXICITY_BLOCKED',
metadata: { score },
};
}
return null;
}
}
Key points
getOrCreate(serviceId, factory, options)-- the factory only runs once perserviceId. All callers receive the same instance.- Cross-extension sharing -- if the PII redaction pack and your custom guardrail both call
getOrCreate('ner-model', ...)with the sameserviceId, they share one model instance. dispose-- called whenrelease(serviceId)orreleaseAll()is invoked. Use it to free GPU memory, close connections, etc.- Use for anything >1MB -- if your resource is large (ML weights, dictionaries), always go through the registry instead of holding it in a class field.
Accessing RAG Sources
When AgentOS performs RAG (Retrieval-Augmented Generation), the retrieved source chunks are passed to output guardrails via payload.ragSources.
async evaluateOutput(
{ chunk, ragSources }: GuardrailOutputPayload,
): Promise<GuardrailEvaluationResult | null> {
// ragSources is only available when RAG retrieval was performed.
// It is undefined for non-RAG conversations.
if (!ragSources || ragSources.length === 0) {
return null;
}
// Only evaluate the final assembled response for grounding.
if (chunk.type !== AgentOSResponseChunkType.FINAL_RESPONSE) {
return null;
}
const responseText = chunk.finalResponseText ?? '';
// Example: check that the response references at least one source.
// A real implementation would use NLI cross-encoders or LLM-as-judge
// (see the built-in Grounding Guard extension for a production example).
const sourceTexts = ragSources.map((s) => s.text);
const mentionsSource = sourceTexts.some((src) =>
responseText.toLowerCase().includes(src.toLowerCase().slice(0, 50))
);
if (!mentionsSource) {
return {
action: GuardrailAction.FLAG,
reason: 'Response may not be grounded in retrieved sources.',
reasonCode: 'GROUNDING_WARNING',
metadata: {
sourceCount: ragSources.length,
responseLength: responseText.length,
},
};
}
return null;
}
When ragSources is available
ragSourcesis populated on every output chunk in a RAG-enabled conversation (not just the final chunk).- It is
undefinedwhen no RAG retrieval was performed for the current request. - Each
RagRetrievedChunkcontainstext,score,metadata, and source identifiers.
Testing Your Guardrail
Constructing mock payloads
import { describe, it, expect } from 'vitest';
import { GuardrailAction, AgentOSResponseChunkType } from '@framers/agentos';
import type {
GuardrailInputPayload,
GuardrailOutputPayload,
GuardrailContext,
} from '@framers/agentos';
import type { AgentOSInput } from '@framers/agentos';
import { MyGuardrail } from '../src/MyGuardrail';
// Reusable mock context for all tests.
const mockContext: GuardrailContext = {
userId: 'test-user',
sessionId: 'test-session',
personaId: 'test-persona',
conversationId: 'test-conversation',
mode: 'test',
};
// Helper: create a mock AgentOSInput with the given text.
function mockInput(text: string): AgentOSInput {
return {
textInput: text,
sessionId: 'test-session',
userId: 'test-user',
} as AgentOSInput;
}
// Helper: create a mock GuardrailInputPayload.
function mockInputPayload(text: string): GuardrailInputPayload {
return {
context: mockContext,
input: mockInput(text),
};
}
// Helper: create a mock TEXT_DELTA chunk for streaming evaluation.
function mockTextDelta(text: string, streamId = 'stream-1'): GuardrailOutputPayload {
return {
context: mockContext,
chunk: {
type: AgentOSResponseChunkType.TEXT_DELTA,
streamId,
gmiInstanceId: 'gmi-1',
personaId: 'test-persona',
isFinal: false,
timestamp: new Date().toISOString(),
textDelta: text,
},
};
}
// Helper: create a mock FINAL_RESPONSE chunk.
function mockFinalResponse(text: string, streamId = 'stream-1'): GuardrailOutputPayload {
return {
context: mockContext,
chunk: {
type: AgentOSResponseChunkType.FINAL_RESPONSE,
streamId,
gmiInstanceId: 'gmi-1',
personaId: 'test-persona',
isFinal: true,
timestamp: new Date().toISOString(),
finalResponseText: text,
},
};
}
Unit testing evaluateInput
describe('MyGuardrail.evaluateInput', () => {
it('should allow clean input', async () => {
// Arrange: create the guardrail with a mock shared service registry.
const guardrail = new MyGuardrail(mockServiceRegistry, {});
// Act: evaluate clean text.
const result = await guardrail.evaluateInput!(
mockInputPayload('Hello, how are you?'),
);
// Assert: null means "allow".
expect(result).toBeNull();
});
it('should block prohibited content', async () => {
const guardrail = new MyGuardrail(mockServiceRegistry, {});
const result = await guardrail.evaluateInput!(
mockInputPayload('This contains badword1 in it'),
);
// Assert: BLOCK action with a reason code.
expect(result).not.toBeNull();
expect(result!.action).toBe(GuardrailAction.BLOCK);
expect(result!.reasonCode).toBe('PROFANITY_INPUT_BLOCKED');
});
});
Unit testing evaluateOutput with TEXT_DELTA and FINAL_RESPONSE
describe('MyGuardrail.evaluateOutput', () => {
it('should allow clean final response', async () => {
const guardrail = new MyGuardrail(mockServiceRegistry, {});
const result = await guardrail.evaluateOutput!(
mockFinalResponse('Here is a helpful answer.'),
);
expect(result).toBeNull();
});
it('should block prohibited content in final response', async () => {
const guardrail = new MyGuardrail(mockServiceRegistry, {});
const result = await guardrail.evaluateOutput!(
mockFinalResponse('The answer is badword1.'),
);
expect(result).not.toBeNull();
expect(result!.action).toBe(GuardrailAction.BLOCK);
});
});
Mocking ISharedServiceRegistry
import type { ISharedServiceRegistry } from '@framers/agentos';
/**
* A minimal mock of ISharedServiceRegistry for testing.
* Stores services in a simple Map and supports getOrCreate, has,
* release, and releaseAll.
*/
const mockServiceRegistry: ISharedServiceRegistry = {
// In-memory store for test services.
_store: new Map<string, unknown>(),
async getOrCreate<T>(
serviceId: string,
factory: () => Promise<T> | T,
): Promise<T> {
if (this._store.has(serviceId)) {
return this._store.get(serviceId) as T;
}
const instance = await factory();
this._store.set(serviceId, instance);
return instance;
},
has(serviceId: string): boolean {
return this._store.has(serviceId);
},
async release(serviceId: string): Promise<void> {
this._store.delete(serviceId);
},
async releaseAll(): Promise<void> {
this._store.clear();
},
} as any;
Integration testing via ParallelGuardrailDispatcher
To test how your guardrail behaves within the full two-phase pipeline:
import { ParallelGuardrailDispatcher } from '@framers/agentos/core/guardrails';
describe('Integration: ParallelGuardrailDispatcher + MyGuardrail', () => {
it('should block input through the dispatcher', async () => {
// Register your guardrail alongside others.
const services = [
new MyGuardrail(mockServiceRegistry, {}),
];
// Run through the full two-phase dispatch.
const outcome = await ParallelGuardrailDispatcher.evaluateInput(
services,
mockInput('This contains badword1'),
mockContext,
);
// The dispatcher's worst-wins aggregation should pick up the BLOCK.
expect(outcome.evaluation?.action).toBe(GuardrailAction.BLOCK);
});
});
Reference test files
packages/agentos/tests/core/guardrails/ParallelGuardrailDispatcher.spec.ts-- comprehensive dispatcher tests.packages/agentos/tests/core/guardrails.integration.spec.ts-- integration tests for the full guardrail pipeline.
Best Practices
-
Start with final-only evaluation. Enable
evaluateStreamingChunksonly when you need real-time filtering (PII redaction, cost ceilings). Final-only is simpler, cheaper, and has lower latency impact. -
Use rate limiting for streaming. Set
maxStreamingEvaluationsto control how many TEXT_DELTA evaluations occur per stream. This is especially important for guardrails that call external APIs or ML models. -
Use specific reasonCodes for analytics. Codes like
'PII_SSN_REDACTED','TOXICITY_HIGH', or'COST_CEILING_EXCEEDED'are far more useful than generic codes. These are machine-readable and can drive dashboards. -
Fail-open on errors. Always wrap your evaluation logic in try/catch and return
nullon unexpected errors. The dispatcher does this at the top level too, but defense-in-depth is important:async evaluateInput(payload) {
try {
// ... your logic
} catch (error) {
console.warn('[MyGuardrail] Unexpected error, failing open:', error);
return null; // Allow content through rather than crashing the pipeline.
}
} -
Do NOT set timeoutMs on safety-critical guardrails. Fail-open on timeout means content passes unchecked. Only use timeoutMs on guardrails where a missed evaluation is acceptable (e.g., quality flags, analytics).
-
Keep guardrails focused. One concern per guardrail. A PII guardrail should not also check for toxicity. This makes them independently testable, configurable, and composable.
-
Set canSanitize only when you actually modify text. If your guardrail only blocks or flags, leave
canSanitizeasfalse(the default) so it runs in the faster Phase 2 parallel path. -
Use ISharedServiceRegistry for anything >1MB. ML models, NLP libraries, large dictionaries -- always go through the shared registry so multiple extensions can share the same instance.
-
Clean up stream buffers. If you maintain per-stream state for streaming guardrails, always delete the buffer on
isFinaland implement a stale-cleanup mechanism for abandoned streams. -
Test with partial patterns. When testing streaming guardrails, send patterns split across multiple TEXT_DELTA chunks to verify your buffer logic detects them correctly.
Reference
API documentation
- IGuardrailService -- the full guardrail contract.
- GuardrailConfig -- streaming, sanitization, and timeout configuration.
- GuardrailEvaluationResult -- the result shape returned by evaluations.
- GuardrailAction -- the four action types (ALLOW, FLAG, SANITIZE, BLOCK).
- AgentOSResponseChunkType -- all chunk types your guardrail may encounter.
- ISharedServiceRegistry -- lazy-loading shared service registry.
- ExtensionPack / ExtensionDescriptor -- extension packaging types.
Built-in guardrail packs (reference implementations)
- PII Redaction -- four-tier PII detection with sentence-boundary buffering.
- ML Content Classifiers -- ONNX-based toxicity, prompt injection, and jailbreak detection.
- Topicality -- embedding-based topic enforcement with drift tracking.
- Code Safety -- OWASP Top 10 code vulnerability scanning.
- Grounding Guard -- NLI cross-encoder + LLM-as-judge hallucination detection.
Related documentation
- Guardrails Overview -- conceptual overview and configuration reference.
- Extension Architecture -- how extensions are loaded, activated, and composed.
- How Extensions Work -- the extension runtime lifecycle.
- Safety Primitives -- broader safety architecture.