Skip to content

API Reference: agent-engine

The agent-engine library contains the core cognitive systems and simulation orchestration components that bring ARLA agents to life. These classes implement sophisticated behavioral models while remaining world-agnostic through the provider interface system.

Cognitive Architecture

The agent-engine implements a modular cognitive architecture inspired by theories from psychology and cognitive science, including memory consolidation, emotional appraisal, identity formation, and causal reasoning.

Core Simulation Engine

The simulation engine orchestrates the entire agent lifecycle, from initialization to shutdown, managing concurrent system execution and state persistence.

SimulationManager

The primary orchestrator that coordinates all simulation systems and manages the simulation lifecycle.

agent_engine.simulation.engine.SimulationManager

This manager is responsible for stepping through time, processing entity decisions, and updating all registered systems.

load_state(filepath)

Loads the entire simulation state from a checkpoint file.

register_system(system_class, **kwargs)

A convenience method to register a system with the SystemManager.

run(start_step=0, end_step=None) async

Executes the main ECS simulation loop asynchronously.

save_state(tick)

Saves the current simulation state to a file.

Key Responsibilities:

  • System Registration: Register and initialize all cognitive and world systems
  • Lifecycle Management: Handle simulation startup, execution, and shutdown
  • State Coordination: Manage state persistence and checkpoint creation
  • Error Recovery: Handle system failures gracefully with rollback capabilities

SystemManager

Manages the concurrent execution of multiple systems with dependency resolution and performance monitoring.

agent_engine.simulation.system.SystemManager

Manages the registration, initialization, and execution of all systems using a specified runner (e.g., AsyncSystemRunner).

get_system(system_type)

Retrieves a system instance of the given type. NOTE: This should be used sparingly, primarily for debugging or post-simulation analysis. Prefer using the EventBus for inter-system communication.

register_system(system_class, **kwargs)

Instantiates and registers a system, passing extra keyword arguments to its constructor. This is crucial for dependency injection.

update_all(current_tick) async

Executes the update method for all registered systems using the runner.

Execution Strategies:

  • Asynchronous: Concurrent system execution for maximum performance
  • Serial: Deterministic execution for debugging and reproducibility
  • Hybrid: Mixed execution with critical systems running synchronously

Cognitive Systems

These systems implement the core intelligence of ARLA agents, from basic decision-making to complex social reasoning.

ReflectionSystem

Implements metacognitive capabilities, enabling agents to analyze their experiences and form narrative memories.

agent_engine.systems.reflection_system.ReflectionSystem

Bases: System

Relies on an injected NarrativeContextProviderInterface for world-specific narrative generation, improving decoupling.

on_action_executed_for_chunking(event_data)

Buffers events for each agent to be chunked into episodes later.

on_reflection_requested(event_data)

Handles an explicit request for reflection from another system.

update(current_tick) async

Periodically triggers the reflection process for active agents.

update_for_entity(entity_id, current_tick, is_final_reflection) async

Allows forcing a reflection cycle for a specific entity.

Reflection Process:

graph TD
    A[Experience Buffer] --> B[Pattern Recognition]
    B --> C[LLM Analysis]
    C --> D[Memory Consolidation]
    D --> E[Insight Generation]
    E --> F[Goal Updates]

Core Functions:

  • Experience Analysis: Process recent actions and outcomes for patterns
  • Memory Consolidation: Convert experiences into lasting narrative memories
  • Insight Generation: Extract learnings and update agent beliefs
  • Goal Adaptation: Modify agent objectives based on reflection insights

QLearningSystem

Implements utility-based decision-making with neural network function approximation and experience replay.

agent_engine.systems.q_learning_system.QLearningSystem

Bases: System

Manages the Q-learning process for all agents, now enhanced with causal inference.

on_action_executed(event_data)

Event handler that triggers the Q-learning update step.

update(current_tick) async

Caches the current state features for each active learning agent.

Learning Architecture:

  • State Encoding: Convert world state to feature vectors via providers
  • Action Valuation: Estimate expected utility for all available actions
  • Policy Updates: Update neural network weights based on prediction errors
  • Exploration Strategy: Balance exploitation with exploration using ε-greedy

Integration Features:

  • Causal Rewards: Incorporates causally-adjusted rewards from CausalGraphSystem
  • Emotional Modulation: Learning rates affected by emotional state
  • Value Alignment: Rewards shaped by agent's personal value system

IdentitySystem

Manages multi-dimensional agent identity across social, competence, moral, relational, and agency domains.

agent_engine.systems.identity_system.IdentitySystem

Bases: System

Updates an agent's multi-domain identity based on narrative reflections. This system is event-driven, world-agnostic, and implements the identity update model.

on_reflection_completed(event_data)

Event handler that orchestrates the full identity update cycle upon receiving a completed reflection. It receives a generic context object and passes it down the chain.

update(current_tick) async

This system is purely event-driven.

Identity Domains:

  • Social Identity


    Group membership, reputation, and social standing within the community.

  • Competence Identity


    Skills, abilities, and perceived effectiveness in various tasks.

  • Moral Identity


    Ethical principles, values, and moral standing.

  • Relational Identity


    Relationships with other agents and social connections.

  • Agency Identity


    Sense of autonomy, control, and self-efficacy.

Identity Dynamics:

  • Experience Integration: Update identity based on action outcomes
  • Social Feedback: Incorporate reactions from other agents
  • Consistency Maintenance: Resolve conflicts between identity domains
  • Adaptive Flexibility: Balance stability with growth and change

GoalSystem

Implements dynamic goal generation, prioritization, and achievement tracking.

agent_engine.systems.goal_system.GoalSystem

Bases: System

Manages an agent's goal creation, selection, and refinement based on its experiences and reflections. This system is world-agnostic.

on_update_goals_event(event_data)

Triggered after a reflection, this orchestrates the goal update cycle.

update(current_tick) async

This system is purely event-driven and has no per-tick logic.

Goal Lifecycle:

graph LR
    A[Goal Generation] --> B[Prioritization]
    B --> C[Plan Formation]
    C --> D[Execution]
    D --> E[Progress Tracking]
    E --> F[Achievement/Adaptation]

Goal Types:

  • Survival Goals: Basic needs like health, energy, and safety
  • Social Goals: Relationship building, reputation, and cooperation
  • Achievement Goals: Skill development, resource accumulation, and mastery
  • Exploratory Goals: Curiosity-driven exploration and learning

Advanced Cognitive Features

Emotional Dynamics

The AffectSystem implements sophisticated emotional modeling based on appraisal theory:

# Example of emotional appraisal calculation
def calculate_emotion(self, event, agent_goals, agent_values):
    """
    Emotion emerges from the intersection of:
    - Goal relevance (does this matter to me?)
    - Outcome controllability (can I influence this?)
    - Value alignment (does this match my values?)
    - Social context (how do others see this?)
    """

    appraisal_scores = {
        'goal_relevance': self._assess_goal_relevance(event, agent_goals),
        'controllability': self._assess_controllability(event),
        'value_alignment': self._assess_value_alignment(event, agent_values),
        'social_approval': self._assess_social_context(event)
    }

    # Emotions emerge from specific appraisal patterns
    if appraisal_scores['goal_relevance'] > 0.7 and appraisal_scores['controllability'] < 0.3:
        return 'frustration'
    elif appraisal_scores['value_alignment'] > 0.8 and appraisal_scores['social_approval'] > 0.6:
        return 'pride'
    # ... additional emotion patterns

Causal Reasoning

The CausalGraphSystem builds formal causal models using the DoWhy library:

# Example of causal effect estimation
def estimate_action_effect(self, agent_id: str, action: str) -> float:
    """
    Estimate the causal effect of an action using do-calculus.

    This answers: "What would happen if the agent were forced
    to take this action, controlling for confounding factors?"
    """

    causal_model = self._get_agent_causal_model(agent_id)
    if not causal_model:
        return 0.0  # Fall back to observational data

    # Perform causal intervention
    causal_effect = causal_model.estimate_effect(
        treatment_value=action,
        outcome='reward',
        method='iv'  # Instrumental variables
    )

    return causal_effect.value

Memory Architecture

Agents maintain multiple memory systems with different retention and access patterns:

class MemoryComponent(Component):
    """Multi-layered memory architecture."""

    def __init__(self):
        # Working memory for immediate context
        self.working_memory: List[Experience] = []

        # Episodic memory for specific experiences
        self.episodic_memory: List[Episode] = []

        # Semantic memory for general knowledge
        self.semantic_memory: Dict[str, ConceptNode] = {}

        # Emotional memories with stronger retention
        self.emotional_memories: List[EmotionalEpisode] = []

        # Social memories about other agents
        self.social_memory: Dict[str, AgentSchema] = {}

System Integration Patterns

Event-Driven Communication

Systems communicate exclusively through the event bus, enabling loose coupling:

class CognitivePipeline:
    """Example of how cognitive systems coordinate."""

    def __init__(self, event_bus):
        self.event_bus = event_bus

        # Systems subscribe to relevant events
        self.event_bus.subscribe("action_executed", self.affect_system.process_outcome)
        self.event_bus.subscribe("action_executed", self.q_learning_system.update_policy)
        self.event_bus.subscribe("reflection_completed", self.goal_system.update_goals)
        self.event_bus.subscribe("identity_changed", self.social_system.update_reputation)

    async def process_agent_tick(self, agent_id: str, tick: int):
        """Coordinate cognitive processing for one agent."""

        # 1. Decision making
        self.event_bus.publish("decision_requested", {
            "agent_id": agent_id,
            "tick": tick
        })

        # 2. Action execution happens in world systems
        # 3. Outcome processing happens automatically via subscriptions
        # 4. Reflection triggered periodically

        if tick % 50 == 0:  # Reflect every 50 ticks
            self.event_bus.publish("reflection_triggered", {
                "agent_id": agent_id,
                "tick": tick
            })

Provider Integration

Cognitive systems access world data through provider interfaces:

class ReflectionSystem(System):
    """Example of provider usage in cognitive systems."""

    def __init__(self, simulation_state, config, cognitive_scaffold):
        super().__init__(simulation_state, config, cognitive_scaffold)

        # Providers injected at runtime
        self.vitality_provider = None
        self.narrative_provider = None
        self.state_encoder = None

    def set_providers(self, providers: Dict[str, Any]):
        """Dependency injection of world-specific providers."""
        self.vitality_provider = providers.get('vitality')
        self.narrative_provider = providers.get('narrative')
        self.state_encoder = providers.get('state_encoder')

    async def generate_reflection(self, agent_id: str) -> str:
        """Generate reflection using injected providers."""

        # Get agent components
        components = self.simulation_state.get_entity_components(agent_id)

        # Use providers to extract world-specific context
        vitality = self.vitality_provider.get_normalized_vitality_metrics(
            agent_id, components, self.config
        )

        narrative_context = self.narrative_provider.get_narrative_context(
            agent_id, components, self.config
        )

        # Generate reflection prompt with LLM
        reflection = await self.cognitive_scaffold.generate_reflection(
            vitality_metrics=vitality,
            narrative_context=narrative_context,
            recent_experiences=self._get_recent_experiences(agent_id)
        )

        return reflection

Performance and Scalability

Concurrent System Execution

The agent-engine supports multiple execution strategies optimized for different scenarios:

# High-performance async execution
runner = AsyncSystemRunner(systems, event_bus)
await runner.execute_tick(current_tick)

# Deterministic serial execution for debugging
runner = SerialSystemRunner(systems, event_bus)
await runner.execute_tick(current_tick)

# Hybrid execution with system priorities
runner = HybridSystemRunner(systems, event_bus, priorities={
    'ActionSystem': 'sync',      # Critical path
    'ReflectionSystem': 'async', # Can run concurrently
    'LoggingSystem': 'background' # Low priority
})
await runner.execute_tick(current_tick)

Memory and State Management

  • Component Pooling: Reuse component instances to reduce allocation overhead
  • Lazy Loading: Load memories and complex state only when needed
  • State Compression: Compress historical data using efficient encoding
  • Garbage Collection: Automatically clean up old memories and unused state

LLM Integration Optimization

class CognitiveScaffold:
    """Optimized LLM integration with caching and batching."""

    def __init__(self):
        self.prompt_cache = LRUCache(maxsize=1000)
        self.batch_queue = []
        self.response_futures = {}

    async def generate_reflection(self, **context):
        """Generate reflection with caching and batching."""

        # Check cache first
        cache_key = self._hash_context(context)
        if cache_key in self.prompt_cache:
            return self.prompt_cache[cache_key]

        # Add to batch queue
        request_id = uuid.uuid4()
        self.batch_queue.append({
            'id': request_id,
            'type': 'reflection',
            'context': context
        })

        # Process batch when full or after timeout
        if len(self.batch_queue) >= 10:
            await self._process_batch()

        # Return future for async completion
        future = asyncio.Future()
        self.response_futures[request_id] = future
        return await future

Testing and Debugging

Cognitive System Testing

import pytest
from unittest.mock import Mock, AsyncMock

class TestReflectionSystem:
    """Example test patterns for cognitive systems."""

    @pytest.fixture
    def reflection_system(self):
        mock_state = Mock()
        mock_config = Mock()
        mock_scaffold = AsyncMock()

        system = ReflectionSystem(mock_state, mock_config, mock_scaffold)

        # Inject mock providers
        system.set_providers({
            'vitality': Mock(),
            'narrative': Mock(),
            'state_encoder': Mock()
        })

        return system

    async def test_reflection_generation(self, reflection_system):
        """Test reflection generation with mocked dependencies."""

        # Setup mock responses
        reflection_system.vitality_provider.get_normalized_vitality_metrics.return_value = {
            'health_norm': 0.8,
            'energy_norm': 0.6
        }

        reflection_system.narrative_provider.get_narrative_context.return_value = {
            'recent_events': ['Won a battle', 'Found treasure'],
            'social_context': 'Respected by peers'
        }

        # Test reflection generation
        reflection = await reflection_system.generate_reflection("test_agent")

        assert reflection is not None
        assert isinstance(reflection, str)

System Integration Testing

async def test_cognitive_pipeline_integration():
    """Test full cognitive pipeline with multiple systems."""

    # Setup simulation environment
    manager = SimulationManager(test_config)
    manager.register_system(ActionSystem)
    manager.register_system(ReflectionSystem)
    manager.register_system(GoalSystem)
    manager.register_system(QLearningSystem)

    # Create test agent with required components
    agent_id = "test_agent"
    manager.simulation_state.add_component(agent_id, MemoryComponent())
    manager.simulation_state.add_component(agent_id, IdentityComponent())
    manager.simulation_state.add_component(agent_id, GoalComponent())

    # Run simulation for several ticks
    for tick in range(10):
        await manager.run_tick(tick)

    # Verify cognitive state updates
    memory_comp = manager.simulation_state.get_component(agent_id, MemoryComponent)
    assert len(memory_comp.episodic_memory) > 0

    goal_comp = manager.simulation_state.get_component(agent_id, GoalComponent)
    assert len(goal_comp.active_goals) > 0

The agent-engine provides a sophisticated yet modular cognitive architecture that can be adapted for a wide range of simulation scenarios while maintaining high performance and extensibility.