Skip to content

Agent Module

The core agent runtime with full-spectrum streaming.

BondAgent

Bases: Generic[T, DepsT]

Generic agent runtime wrapping PydanticAI with full-spectrum streaming.

A BondAgent provides: - High-fidelity streaming with callbacks for every lifecycle event - Block start/end notifications for UI rendering - Real-time streaming of text, thinking, and tool arguments - Tool execution and result callbacks - Message history management - Dynamic instruction override - Toolset composition - Retry handling

Example
agent = BondAgent(
    name="assistant",
    instructions="You are helpful.",
    model="anthropic:claude-sonnet-4-20250514",
    toolsets=[memory_toolset],
    deps=QdrantMemoryStore(),
)

handlers = StreamHandlers(
    on_text_delta=lambda t: print(t, end=""),
    on_tool_execute=lambda id, name, args: print(f"[Running {name}]"),
)

response = await agent.ask("Remember my preference", handlers=handlers)

Methods:

ask(prompt, *, handlers=None, dynamic_instructions=None) async

Send prompt and get response with high-fidelity streaming.

Parameters:

  • prompt (str) –

    The user's message/question.

  • handlers (StreamHandlers | None, default: None ) –

    Optional callbacks for streaming events.

  • dynamic_instructions (str | None, default: None ) –

    Override system prompt for this call only.

Returns:

  • T

    The agent's response of type T.

Source code in src/bond/agent.py
async def ask(
    self,
    prompt: str,
    *,
    handlers: StreamHandlers | None = None,
    dynamic_instructions: str | None = None,
) -> T:
    """Send prompt and get response with high-fidelity streaming.

    Args:
        prompt: The user's message/question.
        handlers: Optional callbacks for streaming events.
        dynamic_instructions: Override system prompt for this call only.

    Returns:
        The agent's response of type T.
    """
    if self._agent is None:
        raise RuntimeError("Agent not initialized")

    active_agent = self._agent
    if dynamic_instructions and dynamic_instructions != self.instructions:
        dynamic_kwargs: dict[str, Any] = {
            "model": self.model,
            "system_prompt": dynamic_instructions,
            "tools": self._tools,
            "output_type": self.output_type,
            "retries": self.max_retries,
        }
        if self.deps is not None:
            dynamic_kwargs["deps_type"] = type(self.deps)
        active_agent = Agent(**dynamic_kwargs)

    if handlers:
        # Build run_stream kwargs - only include deps if provided
        stream_kwargs: dict[str, Any] = {"message_history": self._history}
        if self.deps is not None:
            stream_kwargs["deps"] = self.deps

        async with active_agent.run_stream(prompt, **stream_kwargs) as result:
            # PydanticAI runs tools internally before streaming the final response.
            # We need to emit tool events from the message history first.

            from pydantic_ai.messages import (
                ModelRequest as MsgModelRequest,
            )
            from pydantic_ai.messages import (
                ModelResponse as MsgModelResponse,
            )
            from pydantic_ai.messages import (
                ToolCallPart,
                ToolReturnPart,
            )

            block_index = 0
            tool_id_to_block: dict[str, int] = {}

            # Wait for streaming to be ready, then check for tool calls in history
            # The new_messages() contains messages from THIS run
            # We need to consume at least one chunk to populate messages
            first_chunk = None
            async for chunk in result.stream_text():
                first_chunk = chunk
                break

            # Now check new_messages for tool calls that happened
            for msg in result.new_messages():
                if isinstance(msg, MsgModelResponse):
                    for part in msg.parts:
                        if isinstance(part, ToolCallPart):
                            # Emit tool call block
                            if handlers.on_block_start:
                                handlers.on_block_start("tool-call", block_index)
                            tool_id_to_block[part.tool_call_id] = block_index

                            # Emit tool name/args
                            if handlers.on_tool_call_delta:
                                handlers.on_tool_call_delta(part.tool_name, "")
                                if isinstance(part.args, str):
                                    args_str = part.args
                                else:
                                    args_str = json.dumps(part.args)
                                handlers.on_tool_call_delta("", args_str)

                            # Emit execute event
                            if handlers.on_tool_execute:
                                if isinstance(part.args, str):
                                    args_dict = json.loads(part.args)
                                else:
                                    args_dict = dict(part.args) if part.args else {}
                                handlers.on_tool_execute(
                                    part.tool_call_id, part.tool_name, args_dict
                                )

                            block_index += 1

                elif isinstance(msg, MsgModelRequest):
                    for req_part in msg.parts:
                        if isinstance(req_part, ToolReturnPart):
                            # Emit tool result
                            if handlers.on_tool_result:
                                if isinstance(req_part.content, str):
                                    result_str = req_part.content
                                else:
                                    result_str = str(req_part.content)
                                handlers.on_tool_result(
                                    req_part.tool_call_id,
                                    req_part.tool_name,
                                    result_str,
                                )

                            # Close the tool block
                            tool_block = tool_id_to_block.get(req_part.tool_call_id)
                            if tool_block is not None and handlers.on_block_end:
                                handlers.on_block_end("tool-call", tool_block)

            # Now stream the text response
            text_block = block_index
            text_started = False

            if first_chunk:
                if handlers.on_block_start:
                    handlers.on_block_start("text", text_block)
                text_started = True
                if handlers.on_text_delta:
                    handlers.on_text_delta(first_chunk)

            async for chunk in result.stream_text():
                if not text_started:
                    if handlers.on_block_start:
                        handlers.on_block_start("text", text_block)
                    text_started = True
                if handlers.on_text_delta:
                    handlers.on_text_delta(chunk)

            if text_started and handlers.on_block_end:
                handlers.on_block_end("text", text_block)

            # Stream finished
            self._history = list(result.all_messages())

            # Get output - use get_output() which is the awaitable method
            output: T = await result.get_output()

            if handlers.on_complete:
                handlers.on_complete(output)

            return output

    # Non-streaming fallback - build kwargs similarly
    run_kwargs: dict[str, Any] = {"message_history": self._history}
    if self.deps is not None:
        run_kwargs["deps"] = self.deps

    run_result = await active_agent.run(prompt, **run_kwargs)
    self._history = list(run_result.all_messages())
    result_output: T = run_result.output
    return result_output

get_message_history()

Get current conversation history.

Source code in src/bond/agent.py
def get_message_history(self) -> list[ModelMessage]:
    """Get current conversation history."""
    return list(self._history)

set_message_history(history)

Replace conversation history.

Source code in src/bond/agent.py
def set_message_history(self, history: list[ModelMessage]) -> None:
    """Replace conversation history."""
    self._history = list(history)

clear_history()

Clear conversation history.

Source code in src/bond/agent.py
def clear_history(self) -> None:
    """Clear conversation history."""
    self._history = []

clone_with_history(history)

Create new agent instance with given history (for branching).

Parameters:

  • history (list[ModelMessage]) –

    The message history to use for the clone.

Returns:

  • BondAgent[T, DepsT]

    A new BondAgent with the same configuration but different history.

Source code in src/bond/agent.py
def clone_with_history(self, history: list[ModelMessage]) -> "BondAgent[T, DepsT]":
    """Create new agent instance with given history (for branching).

    Args:
        history: The message history to use for the clone.

    Returns:
        A new BondAgent with the same configuration but different history.
    """
    clone: BondAgent[T, DepsT] = BondAgent(
        name=self.name,
        instructions=self.instructions,
        model=self.model,
        toolsets=list(self.toolsets),
        deps=self.deps,
        output_type=self.output_type,
        max_retries=self.max_retries,
    )
    clone.set_message_history(history)
    return clone

StreamHandlers

Callbacks mapping to every stage of the LLM lifecycle.

This allows the UI to perfectly reconstruct the Agent's thought process.

Lifecycle Events

on_block_start: A new block (Text, Thinking, or Tool Call) has started. on_block_end: A block has finished generating. on_complete: The entire response is finished.

Content Events (Typing Effect): on_text_delta: Incremental text content. on_thinking_delta: Incremental thinking/reasoning content. on_tool_call_delta: Incremental tool name and arguments as they form.

Execution Events

on_tool_execute: Tool call is fully formed and NOW executing. on_tool_result: Tool has finished and returned data.

Example
handlers = StreamHandlers(
    on_block_start=lambda kind, idx: print(f"[Start {kind} #{idx}]"),
    on_text_delta=lambda txt: print(txt, end=""),
    on_tool_execute=lambda id, name, args: print(f"[Running {name}...]"),
    on_tool_result=lambda id, name, res: print(f"[Result: {res}]"),
    on_complete=lambda data: print(f"[Done: {data}]"),
)