Skip to main content

Class: StreamingManager

Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:260

StreamingManager

Implements

Manages real-time data streams for AgentOS, handling client subscriptions and chunk distribution.

Implements

Constructors

Constructor

new StreamingManager(): StreamingManager

Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:266

Returns

StreamingManager

Properties

managerId

readonly managerId: string

Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:264

Methods

closeStream()

closeStream(streamId, reason?): Promise<void>

Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:461

Closes a specific stream. All subscribed clients will be notified and subsequently deregistered. No further data can be pushed to a closed stream.

Parameters

streamId

string

The ID of the stream to close.

reason?

string

An optional reason for closing the stream.

Returns

Promise<void>

A promise that resolves when the stream is closed and clients are notified.

Async

Throws

If the stream does not exist.

Implementation of

IStreamingManager.closeStream


createStream()

createStream(requestedStreamId?): Promise<string>

Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:304

Creates a new data stream and returns its unique ID.

Parameters

requestedStreamId?

string

Optional. If provided, attempts to use this ID. If not provided or if the ID already exists, a new unique ID will be generated.

Returns

Promise<string>

A promise resolving to the unique ID of the created stream.

Throws

If the maximum number of concurrent streams is reached, or if a requestedStreamId is provided but already in use (and regeneration is not supported/fails).

Implementation of

IStreamingManager.createStream


deregisterClient()

deregisterClient(streamId, clientId): Promise<void>

Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:372

Deregisters a client from a specific stream. The client will no longer receive data chunks for this stream.

Parameters

streamId

string

The ID of the stream to unsubscribe from.

clientId

string

The ID of the client to deregister.

Returns

Promise<void>

A promise that resolves when the client is successfully deregistered.

Async

Throws

If the stream or client does not exist within that stream.

Implementation of

IStreamingManager.deregisterClient


getActiveStreamIds()

getActiveStreamIds(): Promise<string[]>

Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:530

Retrieves a list of IDs for all currently active streams.

Returns

Promise<string[]>

A promise resolving to an array of active stream IDs.

Implementation of

IStreamingManager.getActiveStreamIds


getClientCountForStream()

getClientCountForStream(streamId): Promise<number>

Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:536

Retrieves the number of clients currently subscribed to a specific stream.

Parameters

streamId

string

The ID of the stream.

Returns

Promise<number>

A promise resolving to the number of clients.

Async

Throws

If the stream does not exist.

Implementation of

IStreamingManager.getClientCountForStream


handleStreamError()

handleStreamError(streamId, error, terminateStream?): Promise<void>

Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:491

Handles an error that occurred on a specific stream. This might involve notifying clients with an error chunk and/or closing the stream.

Parameters

streamId

string

The ID of the stream where the error occurred.

error

Error

The error object.

terminateStream?

boolean = true

If true, the stream will be closed after processing the error.

Returns

Promise<void>

A promise that resolves when the error has been handled.

Async

Throws

If the stream does not exist.

Implementation of

IStreamingManager.handleStreamError


initialize()

initialize(config): Promise<void>

Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:272

Initializes the StreamingManager with its configuration. This method must be called successfully before any other operations.

Parameters

config

StreamingManagerConfig

The configuration for the manager.

Returns

Promise<void>

A promise that resolves upon successful initialization.

Async

Throws

If configuration is invalid or initialization fails.

Implementation of

IStreamingManager.initialize


pushChunk()

pushChunk(streamId, chunk): Promise<void>

Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:401

Pushes a data chunk to all clients currently subscribed to the specified stream.

Parameters

streamId

string

The ID of the stream to push data to.

chunk

AgentOSResponse

The data chunk to distribute.

Returns

Promise<void>

A promise that resolves when the chunk has been pushed to all active clients of the stream (or attempted, based on onClientSendErrorBehavior).

Async

Throws

If the stream does not exist, or if onClientSendErrorBehavior is 'throw' and a client send fails.

Implementation of

IStreamingManager.pushChunk


registerClient()

registerClient(streamId, client): Promise<void>

Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:343

Registers a client to a specific stream to receive data chunks.

Parameters

streamId

string

The ID of the stream to subscribe to.

client

IStreamClient

The client instance that implements IStreamClient.

Returns

Promise<void>

A promise that resolves when the client is successfully registered.

Async

Throws

If the stream does not exist, if the client is already registered, or if the maximum number of clients for the stream is reached.

Implementation of

IStreamingManager.registerClient


shutdown()

shutdown(isReinitializing?): Promise<void>

Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:546

Gracefully shuts down the StreamingManager, closing all active streams and releasing any resources.

Parameters

isReinitializing?

boolean = false

Returns

Promise<void>

A promise that resolves when shutdown is complete.

Async

Implementation of

IStreamingManager.shutdown