Gym ports (#298)

* gym registry: Uses instance_id parameter and direct indexing: tcp_ports[instance_id]
run_eval: Passes instance_id=run_idx to gym.make()
config: Added instance_id field to track which container to use

* Fix RCON client disconnection by eliminating duplicate gym.make() calls

- **Root Cause**: Two gym.make() calls were creating separate FactorioInstance objects
  trying to connect to the same container, causing RCON conflicts

- **Problem**:
  - Main process: gym.make() → creates FactorioInstance → connects to container
  - Subprocess: gym.make() → creates ANOTHER FactorioInstance → conflicts!

- **Solution**: Eliminate main process gym.make() by:
  - Getting task directly via TaskFactory.create_task()
  - Generating system prompts via SystemPromptGenerator
  - Only subprocess creates gym environment with correct instance_id

- **Changes**:
  - registry.py: Added instance_id parameter to make_factorio_env()
  - run_eval.py: Removed main process gym.make(), kept subprocess gym.make()
  - config.py: Added instance_id field to track container mapping

- **Result**: Each subprocess now connects to its own container without conflicts
  - run_idx=0 → container 0 (port 27000)
  - run_idx=1 → container 1 (port 27001)
  - run_idx=2 → container 2 (port 27002)

Fixes RCON disconnection errors in multi-container gym environments.

* Remove Path usage and eliminate redundant multiagent instructions

- **Path removal**: Replace Path() with os.path.join() in run_eval.py
  - File path now resolves to: /home/kian/factorio-learning-environment/fle
  - Eliminates Path dependency as requested

- **Redundancy fix**: Remove duplicate multiagent instructions from run_eval.py
  - run_eval.py was duplicating the same multiagent logic as instance.py
  - Now uses basic generator.generate('') in main process
  - Proper agent-specific system prompts handled by instance.get_system_prompt(agent_idx)
  - Eliminates code duplication between run_eval.py and instance.py

- **Result**: Cleaner code with single source of truth for multiagent instructions

* Fix outdated env/src paths in MCP protocol files

- Problem: MCP files were using non-existent path parent/env/src
  This resolved to fle/env/protocols/env/src which does not exist

- Root cause: Legacy path structure assumption
  Current: fle/env/ contains tools/, instance.py, etc.
  Old assumption: fle/env/src/ never existed

- Solution: Update all MCP files to use correct path parent.parent
  Before: Path(...).parent / env / src → fle/env/protocols/env/src 
  After: Path(...).parent.parent → fle/env 

- Files fixed:
  resources.py: 2 instances fixed
  tools.py: 2 instances fixed
  unix_tools.py: 2 instances fixed
  Removed obsolete env.src. string replacement

- Verification: All paths now correctly point to fle/env/ with tools/ and instance.py

* Add container mapping debug prints for 7-env test verification

- Container Discovery: Shows all discovered containers with IPs and ports
- Main Process: Logs each run_idx → instance_id assignment
- Subprocess: Verifies gym.make() uses correct instance_id
- Registry: Shows which container is selected for each instance_id
- Instance: Confirms actual RCON connection details

Debug output will show:
🐳 CONTAINER DISCOVERY: Found X containers
🔍 Container details: Container 0: ip:port
🚀 MAIN PROCESS: Starting run_idx=X with instance_id=X
🎯 SUBPROCESS X: Creating gym environment with instance_id=X
🏭 REGISTRY: Creating FactorioInstance for instance_id=X
📡 REGISTRY: Selecting container X: ip:port
🔌 INSTANCE: Successfully connected to ip at tcp/port
 SUBPROCESS X: Connected to ip:port

This will verify the fix for RCON conflicts across 7 parallel environments.

* Container selection fixes for multi-terminal runs

- Add --instance_offset CLI flag (or FLE_INSTANCE_OFFSET env) to shift instance_id per terminal
- Normalize instance_id modulo number of containers inside registry (supports any offset)
- Keep detailed debug prints for discovery, selection, and connection

This ensures parallel runs across terminals map to distinct containers.

* Make container selection explicit via instance_id offset

- Remove automatic modulo normalization in registry
- Require valid instance_id; raise if out of range
- Keep --instance_offset (and FLE_INSTANCE_OFFSET) to compose instance_id = run_idx + offset
- Debug prints reflect explicit selection

This matches previous trajectory runner behavior and avoids unintended cross-terminal overlap.

* Centralize CLI parsing in fle/run.py

- Refactor run_eval.main to accept params (config_path, offset) and remove argparse
- Extend fle/run.py to parse --instance_offset and pass to run_eval
- Keep defaults for direct invocation of run_eval

This consolidates argument parsing in a single entrypoint as requested.

* Fix CLI offset parsing: use --offset in fle/run.py to pass to run_eval

* remove fluff

* remove fluff

* remove fluff

* put back things removed by mistake

* mcp: use importlib.resources.files('fle')/env instead of __file__-based execution_path; aligns with pkg-aware path used in run_eval and CLI

* run_eval: include multi-agent instructions in SystemPromptGenerator input to match instance.get_system_prompt

* unify system prompt construction: add SystemPromptGenerator.generate_for_agent(agent_idx, num_agents); use in instance.get_system_prompt and run_eval

* paths: one-line importlib.resources.files('fle')/env; unix_tools: pkg-aware tools base; instance.get_system_prompt uses generate_for_agent

* patch

* num_agents
This commit is contained in:
kiankyars
2025-08-14 10:22:40 +03:00
committed by GitHub
parent 94e5fad5ff
commit 0d731be2b5
9 changed files with 57 additions and 85 deletions

View File

@@ -29,6 +29,7 @@ class GymEvalConfig:
task: Optional[TaskABC] = None
agent_cards: Optional[List[AgentCard]] = None
env_id: Optional[str] = None # Gym environment ID for registry-based creation
instance_id: Optional[int] = None # Which container to use for this evaluation
def __post_init__(self):
if self.task is None and hasattr(self.agents[0], "task"):

View File

@@ -60,7 +60,7 @@ class FactorioGymRegistry:
goal_description = task_config.get(
"goal_description", f"Task: {task_key}"
)
num_agents = task_config.get("num_agents", 1)
num_agents = task_config["num_agents"]
# Register the environment
self.register_environment(
env_id=task_key,
@@ -130,7 +130,7 @@ class FactorioGymRegistry:
_registry = FactorioGymRegistry()
def make_factorio_env(env_spec: GymEnvironmentSpec) -> FactorioGymEnv:
def make_factorio_env(env_spec: GymEnvironmentSpec, instance_id: int) -> FactorioGymEnv:
"""Factory function to create a Factorio gym environment"""
# Create task from the task definition
@@ -146,7 +146,8 @@ def make_factorio_env(env_spec: GymEnvironmentSpec) -> FactorioGymEnv:
ips, udp_ports, tcp_ports = get_local_container_ips()
if len(tcp_ports) == 0:
raise RuntimeError("No Factorio containers available")
address, tcp_port = ips[0], tcp_ports[0]
address = ips[instance_id]
tcp_port = tcp_ports[instance_id]
common_kwargs = {
"address": address,

View File

@@ -1,4 +1,3 @@
import argparse
import asyncio
import json
import multiprocessing
@@ -17,6 +16,10 @@ from fle.agents.gym_agent import GymAgent
from fle.commons.cluster_ips import get_local_container_ips
from fle.commons.db_client import create_db_client
from fle.eval.algorithms.independent import get_next_version
from fle.eval.tasks import TaskFactory
from fle.env.utils.controller_loader.system_prompt_generator import (
SystemPromptGenerator,
)
load_dotenv()
@@ -55,8 +58,7 @@ async def run_trajectory(run_idx: int, config: GymEvalConfig):
"""Run a single gym evaluation process"""
db_client = await create_db_client()
# Create gym environment using gym.make()
gym_env = gym.make(config.env_id)
gym_env = gym.make(config.env_id, instance_id=config.instance_id)
log_dir = os.path.join(".fle", "trajectory_logs", f"v{config.version}")
runner = GymTrajectoryRunner(
@@ -70,42 +72,31 @@ async def run_trajectory(run_idx: int, config: GymEvalConfig):
await db_client.cleanup()
async def main():
parser = argparse.ArgumentParser()
pkg = importlib.resources.files("fle")
default_config = pkg / "eval" / "algorithms" / "independent" / "gym_run_config.json"
parser.add_argument(
"--run_config",
type=str,
help="Path of the run config file",
default=str(default_config),
)
args = parser.parse_args()
async def main(run_config, offset):
# Read and validate run configurations
run_configs = get_validated_run_configs(args.run_config)
run_config = get_validated_run_configs(run_config)
pkg = importlib.resources.files("fle")
# Get starting version number for new runs
base_version = await get_next_version()
version_offset = 0
# Create and start processes
processes = []
for run_idx, run_config in enumerate(run_configs):
for run_idx, run_config in enumerate(run_config):
# Get environment info from registry
env_info = get_environment_info(run_config.env_id)
if env_info is None:
raise ValueError(f"Could not get environment info for {run_config.env_id}")
# Create gym environment to get task and instance
gym_env = gym.make(run_config.env_id)
task = gym_env.unwrapped.task
instance = gym_env.unwrapped.instance
task = TaskFactory.create_task(env_info["task_config_path"])
generator = SystemPromptGenerator(str(pkg / "env"))
# Create agents and their agent cards
agents = []
agent_cards = []
for agent_idx in range(instance.num_agents):
system_prompt = instance.get_system_prompt(agent_idx)
num_agents = env_info["num_agents"]
for agent_idx in range(num_agents):
system_prompt = generator.generate_for_agent(
agent_idx=agent_idx, num_agents=num_agents
)
agent = GymAgent(
model=run_config.model,
system_prompt=system_prompt,
@@ -132,13 +123,13 @@ async def main():
config = GymEvalConfig(
agents=agents,
version=version,
version_description=f"model:{run_config.model}\ntype:{task.task_key}\nnum_agents:{instance.num_agents}",
version_description=f"model:{run_config.model}\ntype:{task.task_key}\nnum_agents:{num_agents}",
exit_on_task_success=run_config.exit_on_task_success,
task=task,
agent_cards=agent_cards,
env_id=run_config.env_id,
instance_id=run_idx + offset,
)
# Ensure agent cards are properly set for a2a functionality
assert config.agent_cards is not None

15
fle/env/instance.py vendored
View File

@@ -271,18 +271,9 @@ class FactorioInstance:
"""
execution_path = Path(os.path.dirname(os.path.realpath(__file__)))
generator = SystemPromptGenerator(str(execution_path))
multiagent_str = ""
if self.num_agents > 1:
player_idx = agent_idx + 1
multiagent_str = (
f"## MULTIAGENT INSTRUCTIONS\n"
f"You are Agent {player_idx} out of {self.num_agents} agent(s) in the game. "
f"Follow your specific instructions given to you by the task."
f"Use the send_message() tool regularly to communicate with other agents about your current activities and any challenges you encounter. "
f"Start each program with a send_message() call to explain what you are doing. "
f"End each program with a send_message() call to confirm your actions. If your program errors out prior to send_message() being called, the message will not be sent. "
)
return generator.generate(multiagent_str)
return generator.generate_for_agent(
agent_idx=agent_idx, num_agents=self.num_agents
)
def connect_to_server(self, address, tcp_port):
try:

View File

@@ -1,7 +1,6 @@
import json
import os
from pathlib import Path
from typing import Dict
import importlib.resources
from fle.env.utils.controller_loader.system_prompt_generator import (
SystemPromptGenerator,
@@ -220,11 +219,7 @@ async def get_recipe(name: str) -> Dict:
async def get_all_api_docs(method: str):
"""Get all API docs"""
execution_path = (
Path(os.path.dirname(os.path.realpath(__file__))).parent
/ Path("env")
/ Path("src")
)
execution_path = importlib.resources.files("fle") / "env"
generator = SystemPromptGenerator(str(execution_path))
return generator.manual(method)
@@ -234,13 +229,9 @@ async def get_all_api_docs(method: str):
async def get_all_api_schema():
"""Get all API docs"""
execution_path = (
Path(os.path.dirname(os.path.realpath(__file__))).parent
/ Path("env")
/ Path("src")
)
execution_path = importlib.resources.files("fle") / "env"
generator = SystemPromptGenerator(str(execution_path))
schema = (
generator.schema() + "\n\n" + generator.types() + "\n\n" + generator.entities()
)
return schema.replace("env.src.", "")
return schema

View File

@@ -1,6 +1,5 @@
import os
from pathlib import Path
from typing import Dict
import importlib.resources
from mcp.server.fastmcp import Image
from fle.env.entities import Position
@@ -187,11 +186,7 @@ async def schema() -> str:
"""
Get the full API object model for writing code so that you can interact with Factorio.
"""
execution_path = (
Path(os.path.dirname(os.path.realpath(__file__))).parent
/ Path("env")
/ Path("src")
)
execution_path = importlib.resources.files("fle") / "env"
# Generate the documentation
generator = SystemPromptGenerator(str(execution_path))
return f"\n\n{generator.types()}\n\n{generator.entities()}"
@@ -206,11 +201,7 @@ async def manual(name: str) -> str:
name: Name of the method to get documentation for (must be a valid API method)
"""
# Get the list of available agent tools by checking directories in the agent directory
execution_path = (
Path(os.path.dirname(os.path.realpath(__file__))).parent
/ Path("env")
/ Path("src")
)
execution_path = importlib.resources.files("fle") / "env"
agent_tools_path = execution_path / "tools" / "agent"
# Verify the agent_tools_path exists

View File

@@ -3,9 +3,9 @@ Unix-like tools for code introspection in Factorio Learning Environment
"""
import glob
import os
import re
from pathlib import Path
import importlib.resources
from fle.env.utils.controller_loader.system_prompt_generator import (
SystemPromptGenerator,
@@ -15,12 +15,7 @@ from fle.env.protocols.mcp import mcp
def _get_tools_base_path() -> Path:
"""Get the base path to the tools directory"""
return (
Path(os.path.dirname(os.path.realpath(__file__))).parent
/ Path("env")
/ Path("src")
/ "tools"
)
return importlib.resources.files("fle") / "env" / "tools"
@mcp.tool()
@@ -505,11 +500,7 @@ async def man(command: str) -> str:
# If no dedicated doc file, try to generate from the implementation
try:
execution_path = (
Path(os.path.dirname(os.path.realpath(__file__))).parent
/ Path("env")
/ Path("src")
)
execution_path = importlib.resources.files("fle") / "env"
generator = SystemPromptGenerator(str(execution_path))
manual = generator.manual(command)
if manual:

View File

@@ -43,6 +43,20 @@ class SystemPromptGenerator:
f"Here is the manual for the tools available to you\n\n{manual_defs}"
)
def generate_for_agent(self, agent_idx: int = 0, num_agents: int = 1) -> str:
multiagent_str = ""
if num_agents > 1:
player_idx = agent_idx + 1
multiagent_str = (
f"## MULTIAGENT INSTRUCTIONS\n"
f"You are Agent {player_idx} out of {num_agents} agent(s) in the game. "
f"Follow your specific instructions given to you by the task."
f"Use the send_message() tool regularly to communicate with other agents about your current activities and any challenges you encounter. "
f"Start each program with a send_message() call to explain what you are doing. "
f"End each program with a send_message() call to confirm your actions. If your program errors out prior to send_message() being called, the message will not be sent. "
)
return self.generate(multiagent_str)
def manual(self, *args):
try:
return ManualGenerator.generate_manual(

View File

@@ -43,12 +43,8 @@ def fle_cluster(args):
def fle_eval(args):
try:
config_path = Path(args.config)
sys.argv = ["run_eval", "--run_config", str(config_path)]
except TypeError:
sys.argv = ["run_eval"]
try:
asyncio.run(run_eval())
config_path = str(Path(args.config))
asyncio.run(run_eval(config_path, args.offset))
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
@@ -87,6 +83,11 @@ Examples:
)
parser_eval = subparsers.add_parser("eval", help="Run experiment")
parser_eval.add_argument("--config", required=False, help="Path to run config JSON")
parser_eval.add_argument(
"--offset",
type=int,
help="Offset to add to instance_id selection",
)
args = parser.parse_args()
if args.command:
fle_init()