Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ Add `--save-screenshots` to capture frames every 10 turns into `frames/`.

## How It Works

**Game loop.** Each turn the agent ticks PyBoy forward, reads memory, decides, and acts. Turns are cheap. The agent runs hundreds of thousands of them to progress through the game.
**Game loop.** Each turn the agent ticks PyBoy forward, reads memory, decides, and acts. Turns are cheap — headless mode removes the 60fps cap and all rendering, so the emulator runs ~100x faster than real-time. The agent runs hundreds of thousands of them to progress through the game.

**Memory reading.** `MemoryReader` pulls structured data from fixed addresses in Pokemon Red's RAM: battle type, HP, moves, PP, map ID, coordinates, badges, party state. These addresses are specific to the US release.

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pythonpath = ["scripts", "tests"]

[tool.coverage.run]
source = ["scripts"]
omit = ["scripts/diagnose.py", "scripts/install.sh"]
omit = ["scripts/diagnose.py", "scripts/install.sh", "scripts/observe_cli.py"]

[tool.coverage.report]
show_missing = true
Expand Down
11 changes: 9 additions & 2 deletions references/routes.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,22 @@
"0": {
"name": "Pallet Town",
"waypoints": [
{"x": 5, "y": 5, "note": "Start position"},
{"x": 5, "y": 1, "note": "Exit north to Route 1"}
{"x": 8, "y": 10, "note": "Center path between houses"},
{"x": 10, "y": 4, "note": "North through gap — x=10 is the walkable corridor"},
{"x": 10, "y": 2, "note": "Approach north tree line"},
{"x": 10, "y": 1, "note": "Oak trigger tile — only x=10 is open at y=1"}
]
},

"12": {
"name": "Route 1",
"waypoints": [
{"x": 5, "y": 33, "note": "Enter from Pallet Town"},
{"x": 5, "y": 29, "note": "Walk north toward first bend"},
{"x": 7, "y": 27, "note": "Dodge right around ledge/obstacle at y=28"},
{"x": 7, "y": 21, "note": "Continue north on right side"},
{"x": 5, "y": 15, "note": "Shift back to center"},
{"x": 5, "y": 9, "note": "Approach Viridian entrance"},
{"x": 5, "y": 1, "note": "Exit north to Viridian City"}
]
},
Expand Down
187 changes: 173 additions & 14 deletions scripts/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@
# Coords are taken from pret/pokered map object data.
EARLY_GAME_TARGETS = {
38: {"name": "Red's bedroom", "target": (7, 1), "axis": "x"},
37: {"name": "Red's house 1F", "target": (2, 7), "axis": "y"},
0: {"name": "Pallet Town", "target": (5, 1), "axis": "x"},
37: {"name": "Red's house 1F", "target": (3, 9), "axis": "y"},
# Map 0 (Pallet Town) uses waypoints from routes.json instead of a single target.
# The waypoint path (8,10)→(8,4)→(8,1)→(8,0) follows the center corridor to Route 1.
}

# Move ID → (name, type, power, accuracy)
Expand Down Expand Up @@ -235,18 +236,27 @@ def _direction_toward_target(
vertical = "up"

ordered: list[str] = []

primary = [horizontal, vertical] if axis_preference == "x" else [vertical, horizontal]
secondary = [vertical, horizontal] if axis_preference == "x" else [horizontal, vertical]

for direction in primary:
self._add_direction(ordered, direction)
for direction in secondary:
self._add_direction(ordered, direction)
for direction in ("up", "right", "down", "left"):
self._add_direction(ordered, direction)

# Only add backward directions after being stuck a while
if stuck_turns >= 8:
for direction in ("up", "right", "down", "left"):
self._add_direction(ordered, direction)

if not ordered:
return None

# At low stuck counts, only cycle through forward directions
forward_count = min(2, len(ordered))
if stuck_turns < 8:
return ordered[stuck_turns % forward_count]
return ordered[stuck_turns % len(ordered)]

def _try_astar(self, state: OverworldState, target_x: int, target_y: int, collision_grid: list) -> str | None:
Expand All @@ -269,8 +279,14 @@ def next_direction(self, state: OverworldState, turn: int = 0, stuck_turns: int
self.current_waypoint = 0

special_target = EARLY_GAME_TARGETS.get(state.map_id)
# Map 0 early-game target only applies before getting a Pokemon
if state.map_id == 0 and state.party_count > 0:
special_target = None
if special_target:
target_x, target_y = special_target["target"]
# At target: use at_target hint to walk through doors/grass
if state.x == target_x and state.y == target_y:
return special_target.get("at_target", "down")
if collision_grid is not None:
astar_dir = self._try_astar(state, target_x, target_y, collision_grid)
if astar_dir is not None:
Expand Down Expand Up @@ -300,6 +316,12 @@ def next_direction(self, state: OverworldState, turn: int = 0, stuck_turns: int
self.current_waypoint += 1
return self.next_direction(state, turn=turn, stuck_turns=stuck_turns, collision_grid=collision_grid)

# Skip waypoint if close enough but stuck too long
dist = abs(state.x - tx) + abs(state.y - ty)
if stuck_turns >= 8 and dist <= 3 and self.current_waypoint < len(waypoints) - 1:
self.current_waypoint += 1
return self.next_direction(state, turn=turn, stuck_turns=0, collision_grid=collision_grid)

if collision_grid is not None:
astar_dir = self._try_astar(state, tx, ty, collision_grid)
if astar_dir is not None:
Expand Down Expand Up @@ -362,6 +384,7 @@ def __init__(self, rom_path: str, strategy: str = "low", screenshots: bool = Fal
self.maps_visited: set[int] = set()
self.events: list[str] = []
self.collision_map = CollisionMap()
self.door_cooldown: int = 0 # Steps to walk away from door after exiting a building

# Screenshot output directory
self.frames_dir = SCRIPT_DIR.parent / "frames"
Expand All @@ -387,6 +410,10 @@ def update_overworld_progress(self, state: OverworldState):
"""Track whether the last overworld action moved the player."""
pos = (state.map_id, state.x, state.y)

# Milestone detection (before adding to maps_visited)
if state.map_id == 1 and state.map_id not in self.maps_visited:
self.log("MILESTONE | Reached Viridian City!")

self.maps_visited.add(state.map_id)

if self.last_overworld_state is None:
Expand All @@ -397,8 +424,12 @@ def update_overworld_progress(self, state: OverworldState):
self.stuck_turns = 0
self.recent_positions.clear()
self.recent_positions.append(pos)
# Set door cooldown when exiting interior maps to avoid re-entry
prev = self.last_overworld_state.map_id
if prev in (37, 38, 40) and state.map_id == 0:
self.door_cooldown = 8
self.log(
f"MAP CHANGE | {self.last_overworld_state.map_id} -> {state.map_id} | "
f"MAP CHANGE | {prev} -> {state.map_id} | "
f"Pos: ({state.x}, {state.y})"
)
return
Expand All @@ -414,7 +445,7 @@ def update_overworld_progress(self, state: OverworldState):
if len(self.recent_positions) > 8:
self.recent_positions.pop(0)

if self.stuck_turns in {2, 5, 10}:
if self.stuck_turns in {2, 5, 10, 20}:
self.log(
f"STUCK | Map: {state.map_id} | Pos: ({state.x}, {state.y}) | "
f"Last move: {self.last_overworld_action} | Streak: {self.stuck_turns}"
Expand All @@ -425,9 +456,68 @@ def choose_overworld_action(self, state: OverworldState) -> str:
if state.text_box_active:
return "a"

# After Oak escorts the player into the lab, stay in interaction mode
# until the scripted intro there finishes.
# After exiting a building, walk away from the door to avoid re-entry
if self.door_cooldown > 0:
self.door_cooldown -= 1
if self.door_cooldown >= 6:
self.controller.wait(60) # let game scripts complete
return "a" # dismiss any dialogue
if self.door_cooldown >= 3:
return "down" # walk south away from door
return "left" # sidestep to avoid door on return north

# In Oak's lab with no Pokemon: walk to Pokeball table and pick one.
# Oak stands near (5,2) blocking north. Pressing A near him loops
# his dialogue. Going too far south triggers "Don't go away!"
# Strategy: A to dismiss text, down 1 to dodge Oak, right, up to table.
if state.map_id == 40 and state.party_count == 0:
lab_script = self.memory._read(0xD5F1)
if self.turn_count % 50 == 0:
self.log(f"LAB | script={lab_script} pos=({state.x},{state.y}) "
f"turn={self.turn_count}")
if self.turn_count % 200 == 0:
self.take_screenshot(f"lab_t{self.turn_count}", force=True)

if not hasattr(self, '_lab_turns'):
self._lab_turns = 0
self._lab_turns += 1

# Pokeball sprites are at (6,3), (7,3), (8,3) ON the table.
# Interact from y=4 facing UP, or y=2 facing DOWN.
# Simplest path: B(clear) → down to y=4 → right to x=6 → up+A
if not hasattr(self, '_lab_phase'):
self._lab_phase = 0

if self._lab_phase == 0:
# Dismiss Oak's text with B, then move south
if state.y >= 4:
self._lab_phase = 1
self.log(f"LAB | phase 0→1 south at ({state.x},{state.y})")
return "right"
if self._lab_turns % 2 == 1:
return "b"
return "down"

elif self._lab_phase == 1:
# Go east to Pokeball column (x=6 = Charmander)
if state.x >= 6:
self._lab_phase = 2
self.log(f"LAB | phase 1→2 at pokeball column ({state.x},{state.y})")
return "up" # face the table
return "right"

else:
# Phase 2: face up toward Pokeball at (6,3) and press A
# Alternate up (to face table) and A (to interact)
if self._lab_turns % 2 == 0:
return "up"
return "a"

# In Oak's Lab with a Pokemon: alternate A/down to advance scripted
# sequence while moving south (away from bookshelf/table).
if state.map_id == 40 and state.party_count > 0:
if self.turn_count % 3 == 0:
return "down"
return "a"

direction = self.navigator.next_direction(
Expand Down Expand Up @@ -490,11 +580,14 @@ def write_pokedex_entry(self):
path.write_text("\n".join(lines))
self.log(f"POKEDEX | Wrote {path}")

def take_screenshot(self):
def take_screenshot(self, label: str = "", force: bool = False):
"""Save current frame as turn{N}.png."""
if not self.screenshots or Image is None:
if not force and not self.screenshots:
return
path = self.frames_dir / f"turn{self.turn_count}.png"
if Image is None:
return
suffix = f"_{label}" if label else ""
path = self.frames_dir / f"turn{self.turn_count}{suffix}.png"
img = Image.fromarray(self.pyboy.screen.ndarray)
img.save(path)
self.log(f"SCREENSHOT | {path}")
Expand Down Expand Up @@ -552,23 +645,81 @@ def run_overworld(self):
self.collision_map.update(self.pyboy)
except Exception:
pass # game_wrapper may not be available in all contexts

# Diagnostic: capture screen and collision data at key positions
if state.map_id == 37 and not hasattr(self, '_house_diag_done'):
self._house_diag_done = True
self.take_screenshot("house_1f", force=True)
self.log(f"DIAG | House 1F at ({state.x},{state.y}) collision map:")
self.log(self.collision_map.to_ascii())

if state.map_id == 0 and state.y <= 3 and state.party_count == 0:
# Log game state near the Oak trigger zone
wd730 = self.memory._read(0xD730)
wd74b = self.memory._read(0xD74B)
cur_script = self.memory._read(0xD625)
if self.turn_count % 5 == 0:
self.log(
f"DIAG | Pallet y={state.y} x={state.x} "
f"wd730=0x{wd730:02X} wd74b=0x{wd74b:02X} script={cur_script}"
)
if not hasattr(self, '_pallet_diag_done'):
self._pallet_diag_done = True
self.take_screenshot("pallet_north", force=True)

# At y<=1, Oak's PalletTownScript0 triggers. Stop movement and
# wait for Oak to walk to the player, then mash A through dialogue.
if state.y <= 1:
if not hasattr(self, '_oak_wait_done'):
self._oak_wait_done = True
self.log(f"OAK TRIGGER | At y={state.y} x={state.x}. Waiting for Oak script...")
self.take_screenshot("oak_trigger", force=True)
# Wait for Oak to walk from Route 1 to the player (~600 frames)
self.controller.wait(600)
# Oak's lab intro has multiple scripted walking + dialogue phases:
# 1. Oak escorts player to lab (walk script ~300 frames)
# 2. Oak talks about research (several text boxes)
# 3. Oak walks to Pokeball table (walk script ~200 frames)
# 4. Oak says "choose a Pokemon" (text boxes)
# Alternate mashing A and waiting for walk scripts.
for _ in range(4):
self.controller.mash_a(30, delay=30)
self.controller.wait(300)
s = self.memory.read_overworld_state()
wd730 = self.memory._read(0xD730)
self.log(f"OAK TRIGGER | After wait: map={s.map_id} ({s.x},{s.y}) "
f"party={s.party_count} wd730=0x{wd730:02X}")
self.take_screenshot("oak_after_wait", force=True)

action = self.choose_overworld_action(state)

if action in {"up", "down", "left", "right"}:
self.controller.move(action)
elif action == "b":
self.controller.press("b", hold_frames=20, release_frames=12)
self.controller.wait(24)
else:
self.controller.press("a", hold_frames=20, release_frames=12)
self.controller.wait(24)

# Log position every 100 steps
if self.turn_count % 100 == 0:
# Log position every 50 steps (or every 10 on map 0 for debugging)
log_interval = 10 if state.map_id == 0 else 50
if self.turn_count % log_interval == 0:
wp_info = ""
map_key = str(state.map_id)
if map_key in self.navigator.routes:
route = self.navigator.routes[map_key]
waypoints = route["waypoints"] if isinstance(route, dict) and "waypoints" in route else route
if self.navigator.current_waypoint < len(waypoints):
wp = waypoints[self.navigator.current_waypoint]
wp_info = f" | WP: {self.navigator.current_waypoint}→({wp['x']},{wp['y']})"
self.log(
f"OVERWORLD | Map: {state.map_id} | "
f"Pos: ({state.x}, {state.y}) | "
f"Badges: {state.badges} | "
f"Party: {state.party_count} | "
f"Action: {action} | "
f"Stuck: {self.stuck_turns}"
f"Stuck: {self.stuck_turns}{wp_info}"
)

self.last_overworld_state = state
Expand All @@ -592,6 +743,14 @@ def run(self, max_turns: int = 100_000):

self.log("Intro complete. Entering game loop.")

# Diagnostic: capture game state right after intro
intro_state = self.memory.read_overworld_state()
self.take_screenshot("post_intro", force=True)
wd730 = self.memory._read(0xD730)
wd74b = self.memory._read(0xD74B)
self.log(f"DIAG | Post-intro: map={intro_state.map_id} pos=({intro_state.x},{intro_state.y}) "
f"party={intro_state.party_count} wd730=0x{wd730:02X} wd74b=0x{wd74b:02X}")

for _ in range(max_turns):
battle = self.memory.read_battle_state()

Expand Down
Loading