Server Module¶
The Bond server module provides production-ready SSE and WebSocket streaming for any Bond agent.
Factory Function¶
Create a production-ready ASGI server for a BondAgent.
Creates a Starlette application with SSE and WebSocket endpoints for streaming agent responses to UIs and clients.
Parameters:
-
agent(BondAgent[Any, Any]) –The BondAgent to serve.
-
config(ServerConfig | None, default:None) –Optional server configuration. Uses defaults if not provided.
Returns:
-
Starlette–Starlette ASGI application ready for uvicorn or other ASGI servers.
Example
from bond import BondAgent
from bond.server import create_bond_server, ServerConfig
agent = BondAgent(
name="assistant",
instructions="You are helpful.",
model="openai:gpt-4o",
)
# Default configuration
app = create_bond_server(agent)
# Custom configuration
app = create_bond_server(
agent,
config=ServerConfig(
port=3000,
cors_origins=["http://localhost:5173"],
),
)
# Run with uvicorn
# uvicorn main:app --host 0.0.0.0 --port 8000
Endpoints
POST /ask: Request: {"prompt": "...", "session_id": "..." (optional)} Response: {"session_id": "...", "stream_url": "/stream/..."}
GET /stream/{session_id}: SSE stream with events: text, thinking, tool_exec, tool_result, etc.
WS /ws: WebSocket endpoint. Send {"prompt": "..."}, receive streaming events.
GET /health: Response: {"status": "healthy", "agent_name": "..."}
Configuration¶
Configuration for the Bond server.
Attributes:
-
host(str) –Host to bind to (default: "0.0.0.0").
-
port(int) –Port to bind to (default: 8000).
-
cors_origins(list[str]) –Allowed CORS origins (default: ["*"]).
-
session_timeout_seconds(int) –Session expiry time (default: 3600).
-
max_concurrent_sessions(int) –Maximum concurrent sessions (default: 100).
Methods:
-
get_stream_url–Generate the stream URL for a session.
Request/Response Types¶
AskRequest¶
Bases: BaseModel
Request to start a new agent conversation.
Sent to POST /ask endpoint to initiate a streaming session.
SessionResponse¶
Bases: BaseModel
Response from POST /ask with session information.
Contains the session_id needed to connect to the SSE stream.
HealthResponse¶
Session Management¶
Session¶
A streaming session for an agent conversation.
Attributes:
-
session_id(str) –Unique session identifier.
-
prompt(str) –The user's prompt for this session.
-
status(SessionStatus) –Current session status.
-
created_at(float) –Unix timestamp when session was created.
-
history(list[ModelMessage]) –Conversation history for multi-turn sessions.
-
result_queue(Queue[dict[str, Any]]) –Queue for streaming results to SSE connection.
-
error(str | None) –Error message if status is ERROR.
Methods:
-
is_expired–Check if session has expired based on timeout.
SessionStatus¶
SessionManager¶
Manages active streaming sessions.
Thread-safe session storage with automatic cleanup of expired sessions.
Example
manager = SessionManager(timeout_seconds=3600)
# Create a session
session = manager.create_session("What is 2+2?")
# Get the session later
session = manager.get_session(session.session_id)
# Update status
manager.update_status(session.session_id, SessionStatus.STREAMING)
# Cleanup expired sessions periodically
await manager.cleanup_expired()
Initialize session manager.
Parameters:
-
timeout_seconds(int, default:3600) –Time before sessions expire.
-
max_sessions(int, default:100) –Maximum concurrent sessions allowed.
Methods:
-
cleanup_expired–Remove all expired sessions.
-
create_session–Create a new streaming session.
-
get_session–Get a session by ID.
-
remove_session–Remove a session.
-
update_history–Update session conversation history.
-
update_status–Update session status.
Attributes:
-
active_count(int) –Number of active sessions.
active_count
property
¶
Number of active sessions.
cleanup_expired()
async
¶
Remove all expired sessions.
Returns:
-
int–Number of sessions removed.
create_session(prompt, history=None, session_id=None)
async
¶
Create a new streaming session.
Parameters:
-
prompt(str) –The user's prompt.
-
history(list[ModelMessage] | None, default:None) –Optional conversation history.
-
session_id(str | None, default:None) –Optional session ID to reuse (for continuing conversations).
Returns:
-
Session–The created Session object.
Raises:
-
ValueError–If max sessions reached.
get_session(session_id)
async
¶
Get a session by ID.
Parameters:
-
session_id(str) –The session ID to look up.
Returns:
-
Session | None–The Session if found and not expired, None otherwise.
remove_session(session_id)
async
¶
Remove a session.
Parameters:
-
session_id(str) –The session to remove.
update_history(session_id, history)
async
¶
Update session conversation history.
Parameters:
-
session_id(str) –The session to update.
-
history(list[ModelMessage]) –New conversation history.
update_status(session_id, status, error=None)
async
¶
Update session status.
Parameters:
-
session_id(str) –The session to update.
-
status(SessionStatus) –New status.
-
error(str | None, default:None) –Optional error message (for ERROR status).