Skip to content

Architecture

Bond's architecture is designed around one principle: the agent's reasoning should be as observable as its output.

Overview

sequenceDiagram
    participant User
    participant BondAgent
    participant LLM
    participant Tools
    participant StreamHandlers

    User->>BondAgent: ask(prompt, handlers)
    BondAgent->>LLM: run_stream(prompt)

    loop Streaming Response
        LLM-->>BondAgent: PartStartEvent
        BondAgent-->>StreamHandlers: on_block_start(kind, idx)

        LLM-->>BondAgent: TextPartDelta
        BondAgent-->>StreamHandlers: on_text_delta(text)

        LLM-->>BondAgent: ThinkingPartDelta
        BondAgent-->>StreamHandlers: on_thinking_delta(text)

        LLM-->>BondAgent: ToolCallPartDelta
        BondAgent-->>StreamHandlers: on_tool_call_delta(name, args)

        LLM-->>BondAgent: FunctionToolCallEvent
        BondAgent-->>StreamHandlers: on_tool_execute(id, name, args)
        BondAgent->>Tools: execute(args)
        Tools-->>BondAgent: result
        BondAgent-->>StreamHandlers: on_tool_result(id, name, result)

        LLM-->>BondAgent: PartEndEvent
        BondAgent-->>StreamHandlers: on_block_end(kind, idx)
    end

    BondAgent-->>StreamHandlers: on_complete(data)
    BondAgent-->>User: result

StreamHandlers: The Forensic Core

StreamHandlers is a dataclass that maps callbacks to every stage of the LLM lifecycle. This is what makes Bond "forensic"—nothing is hidden.

@dataclass
class StreamHandlers:
    # Lifecycle: Block open/close
    on_block_start: Callable[[str, int], None] | None = None
    on_block_end: Callable[[str, int], None] | None = None

    # Content: Incremental deltas
    on_text_delta: Callable[[str], None] | None = None
    on_thinking_delta: Callable[[str], None] | None = None
    on_tool_call_delta: Callable[[str, str], None] | None = None

    # Execution: Tool running/results
    on_tool_execute: Callable[[str, str, dict[str, Any]], None] | None = None
    on_tool_result: Callable[[str, str, str], None] | None = None

    # Lifecycle: Response complete
    on_complete: Callable[[Any], None] | None = None

Callback Categories

Lifecycle Events

Callback When Arguments Purpose
on_block_start New content block begins (kind: str, index: int) UI can render block container
on_block_end Content block finishes (kind: str, index: int) UI can close block container
on_complete Entire response done (data: Any) Final cleanup, logging

Content Events (Typing Effect)

Callback When Arguments Purpose
on_text_delta Text token arrives (text: str) Show response as it forms
on_thinking_delta Reasoning token arrives (text: str) Show chain-of-thought
on_tool_call_delta Tool args forming (name: str, args: str) Show tool selection

Execution Events

Callback When Arguments Purpose
on_tool_execute Tool call starts (id: str, name: str, args: dict) Show "working" state
on_tool_result Tool returns (id: str, name: str, result: str) Show tool output

Event Flow

flowchart LR
    Start([Prompt]) --> B1[block_start]
    B1 --> T1[text_delta x N]
    T1 --> B1E[block_end]
    B1E --> B2[block_start]
    B2 --> TH[thinking_delta x N]
    TH --> B2E[block_end]
    B2E --> B3[block_start]
    B3 --> TC[tool_call_delta x N]
    TC --> B3E[block_end]
    B3E --> EX[tool_execute]
    EX --> RS[tool_result]
    RS --> Done([complete])

BondAgent

BondAgent is a generic class that wraps PydanticAI's Agent with full-spectrum streaming.

@dataclass
class BondAgent(Generic[T, DepsT]):
    name: str
    instructions: str
    model: str | Model
    toolsets: Sequence[Sequence[Tool[DepsT]]] = field(default_factory=list)
    deps: DepsT | None = None
    output_type: type[T] | Any = str
    max_retries: int = 3

Constructor Parameters

Parameter Type Description
name str Agent identifier for logging
instructions str System prompt defining behavior
model str \| Model Model identifier (e.g., "anthropic:claude-sonnet-4-20250514")
toolsets Sequence[Sequence[Tool]] Groups of tools to provide
deps DepsT \| None Dependency injection for tools
output_type type[T] Expected output type (default: str)
max_retries int Retry count on failures (default: 3)

Run Methods

# Async with streaming handlers
result = await agent.ask(prompt, handlers=handlers)

# Async without handlers
result = await agent.ask(prompt)

# Sync convenience method
result = agent.run_sync(prompt)

Dynamic Instructions

Override the system prompt for a single call:

# Uses base instructions
result = await agent.ask("Query")

# Uses custom instructions for this call only
result = await agent.ask(
    "Query",
    dynamic_instructions="You are a DBA. Focus on performance."
)

The agent's conversation history is preserved—only the system prompt changes.

History Management

# Get current message history
history = agent.get_message_history()

# Set message history (e.g., from storage)
agent.set_message_history(history)

# Clear history for fresh conversation
agent.clear_history()

# Clone agent with shared history
specialist = agent.clone_with_history()

Handler Factories

Bond includes pre-built handler factories for common use cases.

create_websocket_handlers

For WebSocket connections (e.g., FastAPI WebSocket, Socket.io):

from bond import create_websocket_handlers

async def websocket_handler(ws: WebSocket):
    handlers = create_websocket_handlers(ws.send_json)
    await agent.ask("Query", handlers=handlers)

Sends JSON messages:

{"t": "text", "c": "Hello"}
{"t": "thinking", "c": "Let me think..."}
{"t": "tool_exec", "id": "abc", "name": "search", "args": {"q": "test"}}
{"t": "tool_result", "id": "abc", "name": "search", "result": "..."}
{"t": "complete", "data": "Done"}

create_sse_handlers

For Server-Sent Events:

from bond import create_sse_handlers

async def sse_handler(request):
    async def send_sse(event: str, data: dict):
        await response.write(f"event: {event}\ndata: {json.dumps(data)}\n\n")

    handlers = create_sse_handlers(send_sse)
    await agent.ask("Query", handlers=handlers)

create_print_handlers

For CLI apps and debugging:

from bond import create_print_handlers

handlers = create_print_handlers(
    show_thinking=True,   # Print reasoning
    show_tool_args=True,  # Print tool arguments
)

When to Use Each

Handler Use Case
create_websocket_handlers Real-time web apps, chat UIs
create_sse_handlers One-way streaming to browsers
create_print_handlers CLI tools, debugging, testing

Trace Persistence

Bond's trace system captures all StreamHandlers events during execution for later analysis and replay. This enables forensic debugging, auditing, and side-by-side run comparison.

Overview

flowchart LR
    subgraph Capture
        A[BondAgent.ask] --> B[StreamHandlers]
        B --> C[TraceEvent]
        C --> D[TraceStorage]
    end

    subgraph Replay
        D --> E[TraceReplayer]
        E --> F[step/seek/iterate]
    end

TraceEvent

Every callback is captured as a TraceEvent:

@dataclass(frozen=True)
class TraceEvent:
    trace_id: str          # Unique trace identifier
    sequence: int          # 0-indexed event order
    timestamp: float       # Monotonic clock (perf_counter)
    wall_time: datetime    # Wall clock time (UTC)
    event_type: str        # One of 8 event types
    payload: dict[str, Any]  # Event-specific data

Event types map directly to StreamHandlers callbacks:

Event Type Payload Keys Description
block_start kind, index New block started
block_end kind, index Block finished
text_delta text Incremental text
thinking_delta text Reasoning content
tool_call_delta name, args Tool call forming
tool_execute id, name, args Tool executing
tool_result id, name, result Tool returned
complete data Response finished

Storage Protocol

Any storage backend must implement TraceStorageProtocol:

@runtime_checkable
class TraceStorageProtocol(Protocol):
    async def save_event(self, event: TraceEvent) -> None: ...
    async def finalize_trace(self, trace_id: str, status: str = "complete") -> None: ...
    def load_trace(self, trace_id: str) -> AsyncIterator[TraceEvent]: ...
    async def list_traces(self, limit: int = 100) -> list[TraceMeta]: ...
    async def get_trace_meta(self, trace_id: str) -> TraceMeta | None: ...
    async def delete_trace(self, trace_id: str) -> None: ...

Bond includes JSONFileTraceStore for file-based storage:

from bond import JSONFileTraceStore

store = JSONFileTraceStore(".bond/traces")
# Creates: .bond/traces/{trace_id}.json (events, newline-delimited)
# Creates: .bond/traces/{trace_id}.meta.json (metadata)

Capture Workflow

Use create_capture_handlers to record executions:

from bond import (
    BondAgent,
    JSONFileTraceStore,
    create_capture_handlers,
    finalize_capture,
)

# Setup
store = JSONFileTraceStore(".bond/traces")
handlers, trace_id = create_capture_handlers(store)

# Execute with capture
result = await agent.ask("What is the weather?", handlers=handlers)

# Finalize (marks trace complete)
await finalize_capture(store, trace_id)

The handlers record all 8 callback types as TraceEvent objects. Events are saved asynchronously to avoid blocking the main execution flow.

Replay

TraceReplayer provides debugger-like navigation:

from bond import TraceReplayer

replayer = TraceReplayer(store, trace_id)

# Iterate through all events
async for event in replayer:
    print(f"{event.event_type}: {event.payload}")

# Or step through manually
await replayer.reset()
event = await replayer.step()      # Move forward
event = await replayer.step_back() # Move backward
event = await replayer.seek(5)     # Jump to position 5
event = await replayer.current()   # Get current without moving

# Check position
print(f"Position: {replayer.position}/{replayer.total_events}")

Use Cases

Use Case How
Audit agent behavior Replay production traces to see exactly what happened
Debug failures Step through failed runs to find the problematic event
Compare runs Replay two traces side-by-side to find differences
Testing Capture expected traces and compare against future runs

Tool Architecture

Bond uses a protocol-based pattern for tool backends.

Protocol Pattern

from typing import Protocol, runtime_checkable

@runtime_checkable
class MemoryBackend(Protocol):
    """Protocol for memory storage backends."""

    async def store(self, tenant_id: UUID, memory: Memory) -> Memory:
        """Store a memory."""
        ...

    async def search(self, tenant_id: UUID, query: str, limit: int = 10) -> list[Memory]:
        """Search memories by semantic similarity."""
        ...

Backend Implementations

# Qdrant backend
class QdrantMemoryStore:
    def __init__(self, collection: str):
        self.collection = collection

    async def store(self, tenant_id: UUID, memory: Memory) -> Memory:
        # Qdrant-specific implementation
        ...

# PostgreSQL + pgvector backend
class PgVectorMemoryStore:
    def __init__(self, connection_string: str):
        self.conn = connection_string

    async def store(self, tenant_id: UUID, memory: Memory) -> Memory:
        # pgvector-specific implementation
        ...

Toolset Pattern

Tools are grouped into toolsets (lists of Tool objects):

def create_memory_toolset(backend: MemoryBackend) -> list[Tool[MemoryBackend]]:
    """Create memory tools bound to a backend."""

    def store_memory(ctx: RunContext[MemoryBackend], content: str) -> str:
        """Store a memory for later recall."""
        ...

    def search_memories(ctx: RunContext[MemoryBackend], query: str) -> str:
        """Search stored memories."""
        ...

    return [Tool(store_memory), Tool(search_memories)]

Using Toolsets

from bond.tools.memory import create_memory_toolset, QdrantMemoryStore

store = QdrantMemoryStore(collection="agent-memory")
memory_tools = create_memory_toolset(store)

agent = BondAgent(
    name="memory-agent",
    instructions="You remember things.",
    model="anthropic:claude-sonnet-4-20250514",
    toolsets=[memory_tools],
    deps=store,
)

PydanticAI Integration

Bond wraps PydanticAI to provide type-safe agent interactions.

Why PydanticAI?

  • Schema Validation: Tool inputs/outputs validated against Pydantic models
  • Type Safety: Full IDE support with generics
  • Structured Output: Enforce response schemas
  • Retry Logic: Built-in retry with exponential backoff

How Bond Extends It

PydanticAI Bond Addition
Agent.run_stream() Full event routing to StreamHandlers
Single system prompt dynamic_instructions per-call
Manual history get/set/clear/clone_with_history
Individual tools Toolset composition pattern

RunContext in Tools

Tools receive a RunContext with dependency access:

from pydantic_ai.tools import RunContext

def my_tool(ctx: RunContext[MyDeps], arg: str) -> str:
    # Access injected dependencies
    result = ctx.deps.do_something(arg)
    return result

File Reference

Component Location
StreamHandlers src/bond/agent.py:28-73
BondAgent src/bond/agent.py:75-299
Handler Factories src/bond/utils.py:20-199
Memory Toolset src/bond/tools/memory/
Schema Toolset src/bond/tools/schema/