Creating Custom Actions
Actions are the building blocks of agent behavior in ARLA. They define what agents can do, from basic movement to complex social interactions. This guide walks through creating robust, extensible actions.
Action Design Philosophy
Actions should define what is possible, not how it's implemented. The actual state changes happen in Systems via the event bus, keeping actions lightweight and testable.
Action Architecture
The ActionInterface Contract
Every action must implement the ActionInterface, ensuring consistent integration with the simulation engine:
class ActionInterface:
@property
def action_id(self) -> str: ... # Unique identifier
@property
def name(self) -> str: ... # Human-readable name
def get_base_cost(self, simulation_state) -> float: ...
def generate_possible_params(self, entity_id, simulation_state, current_tick) -> List[Dict]: ...
def execute(self, entity_id, simulation_state, params, current_tick) -> ActionOutcome: ...
def get_feature_vector(self, entity_id, simulation_state, params) -> List[float]: ...
Action Lifecycle
graph LR
A[Registration] --> B[Parameter Generation]
B --> C[Action Selection]
C --> D[Execution]
D --> E[System Processing]
E --> F[Outcome Calculation]
- Registration:
@action_registry.registermakes action discoverable - Parameter Generation: Creates valid action variants for current world state
- Selection: AI/heuristic systems choose from available actions
- Execution: Returns ActionOutcome, publishes event for Systems
- System Processing: World-specific logic updates simulation state
- Outcome Calculation: Final rewards and consequences computed
Example: Communication Action
Let's build a comprehensive communication system that demonstrates best practices:
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
from agent_core.agents.actions.action_interface import ActionInterface
from agent_core.agents.actions.action_registry import action_registry
from agent_core.agents.actions.action_outcome import ActionOutcome
from agent_core.core.ecs.abstractions import SimulationState
@dataclass
class CommunicationParams:
"""Structured parameters for communication actions."""
target_agent_id: str
message_type: str # "greeting", "trade_offer", "warning", etc.
message_content: str
urgency: float = 0.5 # 0.0 = casual, 1.0 = urgent
requires_response: bool = False
@action_registry.register
class CommunicateAction(ActionInterface):
"""Enables agents to send messages to nearby agents."""
# Action costs by message type
MESSAGE_COSTS = {
"greeting": 0.5,
"trade_offer": 1.0,
"warning": 0.3,
"insult": 0.2,
"compliment": 0.4
}
@property
def action_id(self) -> str:
return "communicate"
@property
def name(self) -> str:
return "Communicate"
def generate_possible_params(
self,
entity_id: str,
simulation_state: SimulationState,
current_tick: int
) -> List[Dict[str, Any]]:
"""Generate communication possibilities based on context."""
# Get agent's position and social memory
pos_comp = simulation_state.get_component(entity_id, "PositionComponent")
social_comp = simulation_state.get_component(entity_id, "SocialMemoryComponent")
if not pos_comp:
return []
possible_actions = []
# Find nearby agents
nearby_agents = self._find_nearby_agents(
simulation_state,
pos_comp,
max_distance=5
)
for target_id, distance in nearby_agents:
if target_id == entity_id:
continue
# Generate context-appropriate messages
relationship = self._get_relationship_context(
social_comp, target_id
)
message_options = self._generate_message_options(
relationship, distance, current_tick
)
for msg_type, content in message_options:
params = CommunicationParams(
target_agent_id=target_id,
message_type=msg_type,
message_content=content,
urgency=self._calculate_urgency(relationship, msg_type),
requires_response=msg_type in ["trade_offer", "question"]
)
possible_actions.append(params.__dict__)
return possible_actions
def _find_nearby_agents(
self,
simulation_state: SimulationState,
pos_comp,
max_distance: int
) -> List[tuple[str, int]]:
"""Find agents within communication range."""
nearby = []
my_pos = (pos_comp.x, pos_comp.y)
# Query all entities with position components
for entity_id, components in simulation_state.get_entities_with_components([
"PositionComponent"
]).items():
other_pos_comp = components["PositionComponent"]
other_pos = (other_pos_comp.x, other_pos_comp.y)
# Calculate Manhattan distance
distance = abs(my_pos[0] - other_pos[0]) + abs(my_pos[1] - other_pos[1])
if distance <= max_distance:
nearby.append((entity_id, distance))
return nearby
def _generate_message_options(
self,
relationship: Dict[str, Any],
distance: int,
current_tick: int
) -> List[tuple[str, str]]:
"""Generate appropriate messages based on context."""
options = []
# Always available: basic greeting
options.append(("greeting", "Hello there!"))
# Relationship-based messages
if relationship.get("trust", 0.5) > 0.7:
options.append(("compliment", "You're doing great work!"))
options.append(("trade_offer", "Want to trade resources?"))
if relationship.get("threat_level", 0.0) > 0.5:
options.append(("warning", "Stay back!"))
# Distance-based messages
if distance == 1: # Adjacent
options.append(("whisper", "Psst, over here..."))
return options
def execute(
self,
entity_id: str,
simulation_state: SimulationState,
params: Dict[str, Any],
current_tick: int,
) -> ActionOutcome:
"""Execute communication action."""
target_id = params.get("target_agent_id")
message_type = params.get("message_type", "greeting")
content = params.get("message_content", "")
# Validate target exists and is in range
if not self._validate_target(simulation_state, entity_id, target_id):
return ActionOutcome(
success=False,
message=f"Cannot communicate with {target_id}: out of range or invalid",
base_reward=-0.1
)
# Calculate success probability based on relationship
success_prob = self._calculate_success_probability(
simulation_state, entity_id, target_id, message_type
)
import random
success = random.random() < success_prob
outcome_message = (
f"Agent {entity_id} {message_type}s to {target_id}: '{content}'"
if success else
f"Agent {entity_id}'s {message_type} to {target_id} was ignored"
)
return ActionOutcome(
success=success,
message=outcome_message,
base_reward=0.2 if success else -0.05
)
def get_base_cost(self, simulation_state: SimulationState) -> float:
"""Dynamic cost based on recent communication frequency."""
# Could implement fatigue system here
return 1.0
def get_feature_vector(
self,
entity_id: str,
simulation_state: SimulationState,
params: Dict[str, Any],
) -> List[float]:
"""Encode action for machine learning."""
# One-hot encode message types
message_types = ["greeting", "trade_offer", "warning", "insult", "compliment"]
message_type = params.get("message_type", "greeting")
type_encoding = [1.0 if msg_type == message_type else 0.0
for msg_type in message_types]
# Additional features
features = type_encoding + [
float(params.get("urgency", 0.5)), # Urgency level
1.0 if params.get("requires_response") else 0.0, # Expects response
self._get_relationship_strength(simulation_state, entity_id,
params.get("target_agent_id", ""))
]
return features
def _get_relationship_strength(
self,
simulation_state: SimulationState,
entity_id: str,
target_id: str
) -> float:
"""Get normalized relationship strength."""
social_comp = simulation_state.get_component(entity_id, "SocialMemoryComponent")
if not social_comp or target_id not in social_comp.relationships:
return 0.5 # Neutral
relationship = social_comp.relationships[target_id]
return relationship.get("trust", 0.5)
Advanced Action Patterns
Multi-Step Actions
For actions that span multiple ticks:
@action_registry.register
class BuildAction(ActionInterface):
"""Action that takes multiple ticks to complete."""
def generate_possible_params(self, entity_id, simulation_state, current_tick):
# Check if agent is already building
build_comp = simulation_state.get_component(entity_id, "BuildingComponent")
if build_comp and build_comp.is_building:
return [{"action": "continue_building"}]
# Generate new building options
return [
{"action": "start_building", "structure_type": "shelter"},
{"action": "start_building", "structure_type": "workshop"}
]
Conditional Actions
Actions available only under specific conditions:
@action_registry.register
class TradeAction(ActionInterface):
"""Trading requires specific inventory items."""
def generate_possible_params(self, entity_id, simulation_state, current_tick):
inventory = simulation_state.get_component(entity_id, "InventoryComponent")
if not inventory or len(inventory.items) == 0:
return [] # No items to trade
# Find potential trading partners
nearby_traders = self._find_traders_with_desired_items(
simulation_state, entity_id, inventory.wishlist
)
if not nearby_traders:
return []
# Generate trade offers
return self._generate_trade_offers(inventory, nearby_traders)
Composite Actions
Actions that trigger multiple sub-actions:
@action_registry.register
class AttackAction(ActionInterface):
"""Combat action with movement and damage components."""
def execute(self, entity_id, simulation_state, params, current_tick):
# This will trigger multiple systems:
# 1. MovementSystem (if need to close distance)
# 2. CombatSystem (for damage calculation)
# 3. SocialSystem (for reputation effects)
return ActionOutcome(
success=True,
message=f"Agent {entity_id} attacks {params['target']}",
base_reward=0.0, # Determined by combat outcome
metadata={
"triggers_movement": params.get("requires_movement", False),
"combat_type": params.get("weapon_type", "melee")
}
)
Testing Actions
Unit Testing Pattern
import pytest
from unittest.mock import Mock, MagicMock
class TestCommunicateAction:
def setup_method(self):
self.action = CommunicateAction()
self.mock_simulation_state = Mock()
def test_generate_params_no_nearby_agents(self):
"""Test parameter generation with no nearby agents."""
# Mock empty position component
self.mock_simulation_state.get_component.return_value = None
params = self.action.generate_possible_params(
"agent_001", self.mock_simulation_state, 100
)
assert params == []
def test_execute_valid_communication(self):
"""Test successful communication execution."""
params = {
"target_agent_id": "agent_002",
"message_type": "greeting",
"message_content": "Hello!"
}
outcome = self.action.execute(
"agent_001", self.mock_simulation_state, params, 100
)
assert outcome.success
assert "greeting" in outcome.message
assert outcome.base_reward > 0
Integration Testing
async def test_communication_system_integration():
"""Test full communication flow with actual systems."""
# Set up simulation with communication system
manager = SimulationManager(test_config)
manager.register_system(CommunicationSystem)
# Create test agents
agent1 = manager.create_agent("test_agent_1")
agent2 = manager.create_agent("test_agent_2")
# Place agents near each other
pos1 = PositionComponent(x=5, y=5)
pos2 = PositionComponent(x=6, y=5)
manager.simulation_state.add_component(agent1, pos1)
manager.simulation_state.add_component(agent2, pos2)
# Force communication action
action = CommunicateAction()
params = action.generate_possible_params(agent1, manager.simulation_state, 1)
assert len(params) > 0
# Execute and verify system processes it
outcome = action.execute(agent1, manager.simulation_state, params[0], 1)
assert outcome.success
Performance Optimization
Efficient Parameter Generation
def generate_possible_params(self, entity_id, simulation_state, current_tick):
# Cache expensive calculations
if not hasattr(self, '_nearby_cache') or current_tick % 10 == 0:
self._nearby_cache = self._build_proximity_cache(simulation_state)
# Use cached data for parameter generation
nearby_agents = self._nearby_cache.get(entity_id, [])
return self._generate_params_from_cache(entity_id, nearby_agents)
Memory-Efficient Features
def get_feature_vector(self, entity_id, simulation_state, params):
# Use numpy for efficient numeric operations
import numpy as np
features = np.zeros(self.FEATURE_VECTOR_SIZE, dtype=np.float32)
# Sparse encoding for categorical features
message_type_idx = self.MESSAGE_TYPE_TO_INDEX.get(
params.get("message_type"), 0
)
features[message_type_idx] = 1.0
return features.tolist()
Common Pitfalls
Avoid Heavy Logic in Actions
Wrong:
def execute(self, entity_id, simulation_state, params, current_tick):
# Don't do complex state modifications here!
target_health = simulation_state.get_component(params["target"], "HealthComponent")
target_health.current_health -= params["damage"] # BAD!
return ActionOutcome(success=True, message="Attack executed")
Right:
def execute(self, entity_id, simulation_state, params, current_tick):
# Just return the outcome - let CombatSystem handle the logic
return ActionOutcome(
success=True,
message=f"Agent {entity_id} attacks {params['target']}",
base_reward=0.1
)
Parameter Validation
def execute(self, entity_id, simulation_state, params, current_tick):
# Always validate parameters
required_params = ["target_agent_id", "message_type"]
for param in required_params:
if param not in params:
return ActionOutcome(
success=False,
message=f"Missing required parameter: {param}",
base_reward=-0.1
)
# Rest of execution logic...
Feature Vector Consistency
class CommunicateAction(ActionInterface):
# Define feature vector size as class constant
FEATURE_VECTOR_SIZE = 10
def get_feature_vector(self, entity_id, simulation_state, params):
features = [0.0] * self.FEATURE_VECTOR_SIZE
# Always return exactly FEATURE_VECTOR_SIZE elements
return features
Actions form the interface between agent decision-making and world simulation. By following these patterns, you'll create robust, testable actions that integrate seamlessly with ARLA's cognitive architecture.