Skip to content

Trace Module

The trace module provides forensic capture and replay capabilities for Bond agent executions. Record all StreamHandlers events during runs and replay them later for debugging, auditing, and analysis.

Quick Start

from bond import (
    BondAgent,
    JSONFileTraceStore,
    create_capture_handlers,
    finalize_capture,
    TraceReplayer,
)

# Capture during execution
store = JSONFileTraceStore(".bond/traces")
handlers, trace_id = create_capture_handlers(store)
result = await agent.ask("What is the weather?", handlers=handlers)
await finalize_capture(store, trace_id)

# Replay later
replayer = TraceReplayer(store, trace_id)
async for event in replayer:
    print(f"{event.event_type}: {event.payload}")

Event Types

All 8 StreamHandlers callbacks are captured:

Event Type Payload Keys Description
block_start kind, index New block started
block_end kind, index Block finished
text_delta text Incremental text
thinking_delta text Reasoning content
tool_call_delta name, args Tool call forming
tool_execute id, name, args Tool executing
tool_result id, name, result Tool returned
complete data Response finished

TraceEvent

Bases: BaseModel

A single event in an execution trace.

Captures one callback from StreamHandlers with full context for later replay or analysis.

Attributes:

  • trace_id (str) –

    UUID identifying this trace session.

  • sequence (int) –

    Zero-indexed order within the trace.

  • timestamp (float) –

    Monotonic clock value for relative ordering.

  • wall_time (datetime) –

    Human-readable UTC timestamp.

  • event_type (str) –

    One of the 8 callback types.

  • payload (dict[str, Any]) –

    Event-specific data (varies by event_type).


TraceMeta

Bases: BaseModel

Metadata about a stored trace.

Provides summary information without loading all events.

Attributes:

  • trace_id (str) –

    UUID identifying this trace.

  • created_at (datetime) –

    When the trace was started.

  • event_count (int) –

    Number of events in the trace.

  • status (str) –

    One of "in_progress", "complete", "failed".


TraceStorageProtocol

Bases: Protocol

Protocol for trace storage backends.

Provides async methods for saving, loading, and managing execution traces. All operations are async to support various backend implementations (file, database, remote storage).

Implementations
  • JSONFileTraceStore: Local JSON file storage (default)

Methods:

delete_trace(trace_id) async

Delete a trace and all its events.

Parameters:

  • trace_id (str) –

    The trace to delete.

Raises:

  • KeyError

    If trace_id doesn't exist.

  • IOError

    If deletion fails.

Source code in src/bond/trace/_protocols.py
async def delete_trace(self, trace_id: str) -> None:
    """Delete a trace and all its events.

    Args:
        trace_id: The trace to delete.

    Raises:
        KeyError: If trace_id doesn't exist.
        IOError: If deletion fails.
    """
    ...

finalize_trace(trace_id, status='complete') async

Mark a trace as complete or failed.

Should be called when the agent run finishes. This updates the trace metadata and may trigger cleanup or indexing.

Parameters:

  • trace_id (str) –

    The trace to finalize.

  • status (str, default: 'complete' ) –

    Final status ("complete" or "failed").

Raises:

  • KeyError

    If trace_id doesn't exist.

  • IOError

    If storage fails.

Source code in src/bond/trace/_protocols.py
async def finalize_trace(
    self,
    trace_id: str,
    status: str = "complete",
) -> None:
    """Mark a trace as complete or failed.

    Should be called when the agent run finishes. This updates
    the trace metadata and may trigger cleanup or indexing.

    Args:
        trace_id: The trace to finalize.
        status: Final status ("complete" or "failed").

    Raises:
        KeyError: If trace_id doesn't exist.
        IOError: If storage fails.
    """
    ...

list_traces(limit=100) async

List available traces with metadata.

Returns traces ordered by creation time (newest first).

Parameters:

  • limit (int, default: 100 ) –

    Maximum number of traces to return.

Returns:

  • list[TraceMeta]

    List of TraceMeta for available traces.

Raises:

  • IOError

    If listing fails.

Source code in src/bond/trace/_protocols.py
async def list_traces(self, limit: int = 100) -> list[TraceMeta]:
    """List available traces with metadata.

    Returns traces ordered by creation time (newest first).

    Args:
        limit: Maximum number of traces to return.

    Returns:
        List of TraceMeta for available traces.

    Raises:
        IOError: If listing fails.
    """
    ...

load_trace(trace_id)

Load all events from a trace for replay.

Yields events in sequence order. Memory-efficient for large traces. This is an async generator method.

Parameters:

  • trace_id (str) –

    The trace to load.

Yields:

  • AsyncIterator[TraceEvent]

    TraceEvent objects in sequence order.

Raises:

  • KeyError

    If trace_id doesn't exist.

  • IOError

    If loading fails.

Source code in src/bond/trace/_protocols.py
def load_trace(self, trace_id: str) -> AsyncIterator[TraceEvent]:
    """Load all events from a trace for replay.

    Yields events in sequence order. Memory-efficient for large traces.
    This is an async generator method.

    Args:
        trace_id: The trace to load.

    Yields:
        TraceEvent objects in sequence order.

    Raises:
        KeyError: If trace_id doesn't exist.
        IOError: If loading fails.
    """
    ...

save_event(event) async

Append an event to a trace.

The trace is created if it doesn't exist. Events should be saved in order (by sequence number).

Parameters:

Raises:

  • IOError

    If storage fails.

Source code in src/bond/trace/_protocols.py
async def save_event(self, event: TraceEvent) -> None:
    """Append an event to a trace.

    The trace is created if it doesn't exist. Events should be
    saved in order (by sequence number).

    Args:
        event: The trace event to save.

    Raises:
        IOError: If storage fails.
    """
    ...

JSONFileTraceStore

Store traces as JSON files in a directory.

Each trace consists of two files
  • {trace_id}.json: Newline-delimited JSON events
  • {trace_id}.meta.json: TraceMeta as JSON
File Structure

{base_path}/ ├── {trace_id}.json # Events (one JSON object per line) └── {trace_id}.meta.json # TraceMeta

Example
store = JSONFileTraceStore(".bond/traces")
await store.save_event(event)
await store.finalize_trace(trace_id)

async for event in store.load_trace(trace_id):
    print(event)

Initialize JSON file store.

Parameters:

  • base_path (Path | str, default: '.bond/traces' ) –

    Directory for trace files. Created if doesn't exist.

Methods:

delete_trace(trace_id) async

Delete a trace and all its files.

Removes both the events file and metadata file.

Parameters:

  • trace_id (str) –

    The trace to delete.

Raises:

  • KeyError

    If trace_id doesn't exist.

  • IOError

    If deletion fails.

finalize_trace(trace_id, status=STATUS_COMPLETE) async

Mark a trace as complete or failed.

Updates the metadata file with the final status.

Parameters:

  • trace_id (str) –

    The trace to finalize.

  • status (str, default: STATUS_COMPLETE ) –

    Final status ("complete" or "failed").

Raises:

  • KeyError

    If trace_id doesn't exist.

  • IOError

    If writing fails.

get_trace_meta(trace_id) async

Get metadata for a specific trace.

Parameters:

  • trace_id (str) –

    The trace to get metadata for.

Returns:

  • TraceMeta | None

    TraceMeta if found, None otherwise.

list_traces(limit=100) async

List available traces with metadata.

Returns traces ordered by creation time (newest first).

Parameters:

  • limit (int, default: 100 ) –

    Maximum number of traces to return.

Returns:

  • list[TraceMeta]

    List of TraceMeta for available traces.

Raises:

  • IOError

    If listing fails.

load_trace(trace_id) async

Load all events from a trace for replay.

Yields events in sequence order. Memory-efficient for large traces since it streams line by line.

Parameters:

  • trace_id (str) –

    The trace to load.

Yields:

  • AsyncIterator[TraceEvent]

    TraceEvent objects in sequence order.

Raises:

  • KeyError

    If trace_id doesn't exist.

  • IOError

    If reading fails.

save_event(event) async

Append event to trace file.

Creates or updates the metadata file to track event count. Uses newline-delimited JSON for efficient streaming reads.

Parameters:

Raises:

  • IOError

    If writing fails.


create_capture_handlers

Create handlers that capture all events to storage.

Returns handlers that can be passed to agent.ask() along with the trace ID for later replay. All 8 StreamHandlers callbacks are wired to record events with sequence numbers for ordering.

Parameters:

  • storage (TraceStorageProtocol) –

    Backend to store events (e.g., JSONFileTraceStore).

  • trace_id (str | None, default: None ) –

    Optional trace ID (auto-generated UUID if None).

Returns:

  • StreamHandlers

    Tuple of (handlers, trace_id) - use handlers with agent.ask(),

  • str

    keep trace_id for later replay.

Example
store = JSONFileTraceStore()
handlers, trace_id = create_capture_handlers(store)
result = await agent.ask("query", handlers=handlers)
await finalize_capture(store, trace_id)

# Later: replay with trace_id
async for event in store.load_trace(trace_id):
    print(event)

finalize_capture

Mark a trace as complete after agent.ask() returns.

Should be called after the agent run finishes to update the trace metadata with the final status.

Parameters:

  • storage (TraceStorageProtocol) –

    The storage backend used for capture.

  • trace_id (str) –

    The trace ID from create_capture_handlers().

  • status (str, default: STATUS_COMPLETE ) –

    Final status ("complete" or "failed").

Example
handlers, trace_id = create_capture_handlers(store)
try:
    result = await agent.ask("query", handlers=handlers)
    await finalize_capture(store, trace_id, "complete")
except Exception:
    await finalize_capture(store, trace_id, "failed")
    raise

TraceReplayer

Replay a stored trace event by event.

Supports both async iteration and manual stepping through events. Events are loaded on-demand and cached for stepping operations.

Example (async iteration): replayer = TraceReplayer(storage, trace_id) async for event in replayer: print(f"{event.event_type}: {event.payload}")

Example (manual stepping): replayer = TraceReplayer(storage, trace_id) while event := await replayer.step(): print(event) await asyncio.sleep(event.timestamp) # Replay at original timing

Example (seeking): replayer = TraceReplayer(storage, trace_id) await replayer.seek(10) # Jump to event 10 event = await replayer.step()

Initialize replayer for a trace.

Parameters:

  • storage (TraceStorageProtocol) –

    Backend containing the trace.

  • trace_id (str) –

    ID of the trace to replay.

Methods:

  • __aiter__

    Iterate through all events.

  • current

    Get the current event without advancing position.

  • reset

    Reset to the beginning of the trace.

  • seek

    Jump to a specific position in the trace.

  • step

    Get the next event in the trace.

  • step_back

    Go back one event.

Attributes:

  • position (int) –

    Current position in trace (0-indexed).

  • total_events (int | None) –

    Total number of events in trace.

Source code in src/bond/trace/replay.py
def __init__(self, storage: TraceStorageProtocol, trace_id: str) -> None:
    """Initialize replayer for a trace.

    Args:
        storage: Backend containing the trace.
        trace_id: ID of the trace to replay.
    """
    self.storage = storage
    self.trace_id = trace_id
    self._events: list[TraceEvent] | None = None
    self._position: int = 0

position property

Current position in trace (0-indexed).

Returns:

  • int

    The current event index.

total_events property

Total number of events in trace.

Returns:

  • int | None

    Event count if loaded, None if not yet loaded.

__aiter__() async

Iterate through all events.

Streams directly from storage without loading all events into memory first.

Yields:

  • AsyncIterator[TraceEvent]

    TraceEvent objects in sequence order.

Source code in src/bond/trace/replay.py
async def __aiter__(self) -> AsyncIterator[TraceEvent]:
    """Iterate through all events.

    Streams directly from storage without loading all events
    into memory first.

    Yields:
        TraceEvent objects in sequence order.
    """
    async for event in self.storage.load_trace(self.trace_id):
        yield event

current() async

Get the current event without advancing position.

Returns:

  • TraceEvent | None

    The current TraceEvent, or None if at end.

Source code in src/bond/trace/replay.py
async def current(self) -> TraceEvent | None:
    """Get the current event without advancing position.

    Returns:
        The current TraceEvent, or None if at end.
    """
    await self._load()
    assert self._events is not None
    if self._position < len(self._events):
        return self._events[self._position]
    return None

reset() async

Reset to the beginning of the trace.

Source code in src/bond/trace/replay.py
async def reset(self) -> None:
    """Reset to the beginning of the trace."""
    self._position = 0

seek(position) async

Jump to a specific position in the trace.

Parameters:

  • position (int) –

    The event index to seek to (0-indexed).

Returns:

  • TraceEvent | None

    The event at that position, or None if position is at/past end.

Source code in src/bond/trace/replay.py
async def seek(self, position: int) -> TraceEvent | None:
    """Jump to a specific position in the trace.

    Args:
        position: The event index to seek to (0-indexed).

    Returns:
        The event at that position, or None if position is at/past end.
    """
    await self._load()
    assert self._events is not None
    self._position = max(0, min(position, len(self._events)))
    if self._position < len(self._events):
        return self._events[self._position]
    return None

step() async

Get the next event in the trace.

Returns:

  • TraceEvent | None

    The next TraceEvent, or None if at end of trace.

Source code in src/bond/trace/replay.py
async def step(self) -> TraceEvent | None:
    """Get the next event in the trace.

    Returns:
        The next TraceEvent, or None if at end of trace.
    """
    await self._load()
    assert self._events is not None
    if self._position >= len(self._events):
        return None
    event = self._events[self._position]
    self._position += 1
    return event

step_back() async

Go back one event.

Returns:

  • TraceEvent | None

    The previous TraceEvent, or None if at start of trace.

Source code in src/bond/trace/replay.py
async def step_back(self) -> TraceEvent | None:
    """Go back one event.

    Returns:
        The previous TraceEvent, or None if at start of trace.
    """
    await self._load()
    assert self._events is not None
    if self._position <= 0:
        return None
    self._position -= 1
    return self._events[self._position]