Class: StreamingManager
Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:260
StreamingManager
Implements
Manages real-time data streams for AgentOS, handling client subscriptions and chunk distribution.
Implements
Constructors
Constructor
new StreamingManager():
StreamingManager
Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:266
Returns
StreamingManager
Properties
managerId
readonlymanagerId:string
Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:264
Methods
closeStream()
closeStream(
streamId,reason?):Promise<void>
Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:461
Closes a specific stream. All subscribed clients will be notified and subsequently deregistered. No further data can be pushed to a closed stream.
Parameters
streamId
string
The ID of the stream to close.
reason?
string
An optional reason for closing the stream.
Returns
Promise<void>
A promise that resolves when the stream is closed and clients are notified.
Async
Throws
If the stream does not exist.
Implementation of
createStream()
createStream(
requestedStreamId?):Promise<string>
Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:304
Creates a new data stream and returns its unique ID.
Parameters
requestedStreamId?
string
Optional. If provided, attempts to use this ID. If not provided or if the ID already exists, a new unique ID will be generated.
Returns
Promise<string>
A promise resolving to the unique ID of the created stream.
Throws
If the maximum number of concurrent streams is reached,
or if a requestedStreamId is provided but already in use (and regeneration is not supported/fails).
Implementation of
IStreamingManager.createStream
deregisterClient()
deregisterClient(
streamId,clientId):Promise<void>
Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:372
Deregisters a client from a specific stream. The client will no longer receive data chunks for this stream.
Parameters
streamId
string
The ID of the stream to unsubscribe from.
clientId
string
The ID of the client to deregister.
Returns
Promise<void>
A promise that resolves when the client is successfully deregistered.
Async
Throws
If the stream or client does not exist within that stream.
Implementation of
IStreamingManager.deregisterClient
getActiveStreamIds()
getActiveStreamIds():
Promise<string[]>
Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:530
Retrieves a list of IDs for all currently active streams.
Returns
Promise<string[]>
A promise resolving to an array of active stream IDs.
Implementation of
IStreamingManager.getActiveStreamIds
getClientCountForStream()
getClientCountForStream(
streamId):Promise<number>
Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:536
Retrieves the number of clients currently subscribed to a specific stream.
Parameters
streamId
string
The ID of the stream.
Returns
Promise<number>
A promise resolving to the number of clients.
Async
Throws
If the stream does not exist.
Implementation of
IStreamingManager.getClientCountForStream
handleStreamError()
handleStreamError(
streamId,error,terminateStream?):Promise<void>
Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:491
Handles an error that occurred on a specific stream. This might involve notifying clients with an error chunk and/or closing the stream.
Parameters
streamId
string
The ID of the stream where the error occurred.
error
Error
The error object.
terminateStream?
boolean = true
If true, the stream will be closed after processing the error.
Returns
Promise<void>
A promise that resolves when the error has been handled.
Async
Throws
If the stream does not exist.
Implementation of
IStreamingManager.handleStreamError
initialize()
initialize(
config):Promise<void>
Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:272
Initializes the StreamingManager with its configuration. This method must be called successfully before any other operations.
Parameters
config
The configuration for the manager.
Returns
Promise<void>
A promise that resolves upon successful initialization.
Async
Throws
If configuration is invalid or initialization fails.
Implementation of
pushChunk()
pushChunk(
streamId,chunk):Promise<void>
Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:401
Pushes a data chunk to all clients currently subscribed to the specified stream.
Parameters
streamId
string
The ID of the stream to push data to.
chunk
The data chunk to distribute.
Returns
Promise<void>
A promise that resolves when the chunk has been pushed to all
active clients of the stream (or attempted, based on onClientSendErrorBehavior).
Async
Throws
If the stream does not exist, or if onClientSendErrorBehavior is 'throw'
and a client send fails.
Implementation of
registerClient()
registerClient(
streamId,client):Promise<void>
Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:343
Registers a client to a specific stream to receive data chunks.
Parameters
streamId
string
The ID of the stream to subscribe to.
client
IStreamClient
The client instance that implements IStreamClient.
Returns
Promise<void>
A promise that resolves when the client is successfully registered.
Async
Throws
If the stream does not exist, if the client is already registered, or if the maximum number of clients for the stream is reached.
Implementation of
IStreamingManager.registerClient
shutdown()
shutdown(
isReinitializing?):Promise<void>
Defined in: packages/agentos/src/core/streaming/StreamingManager.ts:546
Gracefully shuts down the StreamingManager, closing all active streams and releasing any resources.
Parameters
isReinitializing?
boolean = false
Returns
Promise<void>
A promise that resolves when shutdown is complete.