diff --git a/README.md b/README.md index a5979a2..1e32bec 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ Add `--save-screenshots` to capture frames every 10 turns into `frames/`. ## How It Works -**Game loop.** Each turn the agent ticks PyBoy forward, reads memory, decides, and acts. Turns are cheap. The agent runs hundreds of thousands of them to progress through the game. +**Game loop.** Each turn the agent ticks PyBoy forward, reads memory, decides, and acts. Turns are cheap — headless mode removes the 60fps cap and all rendering, so the emulator runs ~100x faster than real-time. The agent runs hundreds of thousands of them to progress through the game. **Memory reading.** `MemoryReader` pulls structured data from fixed addresses in Pokemon Red's RAM: battle type, HP, moves, PP, map ID, coordinates, badges, party state. These addresses are specific to the US release. diff --git a/pyproject.toml b/pyproject.toml index 30c0dd7..1c07828 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,7 +21,7 @@ pythonpath = ["scripts", "tests"] [tool.coverage.run] source = ["scripts"] -omit = ["scripts/diagnose.py", "scripts/install.sh"] +omit = ["scripts/diagnose.py", "scripts/install.sh", "scripts/observe_cli.py"] [tool.coverage.report] show_missing = true diff --git a/references/routes.json b/references/routes.json index 8153084..8b83fcc 100644 --- a/references/routes.json +++ b/references/routes.json @@ -4,8 +4,10 @@ "0": { "name": "Pallet Town", "waypoints": [ - {"x": 5, "y": 5, "note": "Start position"}, - {"x": 5, "y": 1, "note": "Exit north to Route 1"} + {"x": 8, "y": 10, "note": "Center path between houses"}, + {"x": 10, "y": 4, "note": "North through gap — x=10 is the walkable corridor"}, + {"x": 10, "y": 2, "note": "Approach north tree line"}, + {"x": 10, "y": 1, "note": "Oak trigger tile — only x=10 is open at y=1"} ] }, @@ -13,6 +15,11 @@ "name": "Route 1", "waypoints": [ {"x": 5, "y": 33, "note": "Enter from Pallet Town"}, + {"x": 5, "y": 29, "note": "Walk north toward first bend"}, + {"x": 7, "y": 27, "note": "Dodge right around ledge/obstacle at y=28"}, + {"x": 7, "y": 21, "note": "Continue north on right side"}, + {"x": 5, "y": 15, "note": "Shift back to center"}, + {"x": 5, "y": 9, "note": "Approach Viridian entrance"}, {"x": 5, "y": 1, "note": "Exit north to Viridian City"} ] }, diff --git a/scripts/agent.py b/scripts/agent.py index 44a1c34..1e23569 100644 --- a/scripts/agent.py +++ b/scripts/agent.py @@ -43,8 +43,9 @@ # Coords are taken from pret/pokered map object data. EARLY_GAME_TARGETS = { 38: {"name": "Red's bedroom", "target": (7, 1), "axis": "x"}, - 37: {"name": "Red's house 1F", "target": (2, 7), "axis": "y"}, - 0: {"name": "Pallet Town", "target": (5, 1), "axis": "x"}, + 37: {"name": "Red's house 1F", "target": (3, 9), "axis": "y"}, + # Map 0 (Pallet Town) uses waypoints from routes.json instead of a single target. + # The waypoint path (8,10)→(8,4)→(8,1)→(8,0) follows the center corridor to Route 1. } # Move ID → (name, type, power, accuracy) @@ -235,6 +236,7 @@ def _direction_toward_target( vertical = "up" ordered: list[str] = [] + primary = [horizontal, vertical] if axis_preference == "x" else [vertical, horizontal] secondary = [vertical, horizontal] if axis_preference == "x" else [horizontal, vertical] @@ -242,11 +244,19 @@ def _direction_toward_target( self._add_direction(ordered, direction) for direction in secondary: self._add_direction(ordered, direction) - for direction in ("up", "right", "down", "left"): - self._add_direction(ordered, direction) + + # Only add backward directions after being stuck a while + if stuck_turns >= 8: + for direction in ("up", "right", "down", "left"): + self._add_direction(ordered, direction) if not ordered: return None + + # At low stuck counts, only cycle through forward directions + forward_count = min(2, len(ordered)) + if stuck_turns < 8: + return ordered[stuck_turns % forward_count] return ordered[stuck_turns % len(ordered)] def _try_astar(self, state: OverworldState, target_x: int, target_y: int, collision_grid: list) -> str | None: @@ -269,8 +279,14 @@ def next_direction(self, state: OverworldState, turn: int = 0, stuck_turns: int self.current_waypoint = 0 special_target = EARLY_GAME_TARGETS.get(state.map_id) + # Map 0 early-game target only applies before getting a Pokemon + if state.map_id == 0 and state.party_count > 0: + special_target = None if special_target: target_x, target_y = special_target["target"] + # At target: use at_target hint to walk through doors/grass + if state.x == target_x and state.y == target_y: + return special_target.get("at_target", "down") if collision_grid is not None: astar_dir = self._try_astar(state, target_x, target_y, collision_grid) if astar_dir is not None: @@ -300,6 +316,12 @@ def next_direction(self, state: OverworldState, turn: int = 0, stuck_turns: int self.current_waypoint += 1 return self.next_direction(state, turn=turn, stuck_turns=stuck_turns, collision_grid=collision_grid) + # Skip waypoint if close enough but stuck too long + dist = abs(state.x - tx) + abs(state.y - ty) + if stuck_turns >= 8 and dist <= 3 and self.current_waypoint < len(waypoints) - 1: + self.current_waypoint += 1 + return self.next_direction(state, turn=turn, stuck_turns=0, collision_grid=collision_grid) + if collision_grid is not None: astar_dir = self._try_astar(state, tx, ty, collision_grid) if astar_dir is not None: @@ -362,6 +384,7 @@ def __init__(self, rom_path: str, strategy: str = "low", screenshots: bool = Fal self.maps_visited: set[int] = set() self.events: list[str] = [] self.collision_map = CollisionMap() + self.door_cooldown: int = 0 # Steps to walk away from door after exiting a building # Screenshot output directory self.frames_dir = SCRIPT_DIR.parent / "frames" @@ -387,6 +410,10 @@ def update_overworld_progress(self, state: OverworldState): """Track whether the last overworld action moved the player.""" pos = (state.map_id, state.x, state.y) + # Milestone detection (before adding to maps_visited) + if state.map_id == 1 and state.map_id not in self.maps_visited: + self.log("MILESTONE | Reached Viridian City!") + self.maps_visited.add(state.map_id) if self.last_overworld_state is None: @@ -397,8 +424,12 @@ def update_overworld_progress(self, state: OverworldState): self.stuck_turns = 0 self.recent_positions.clear() self.recent_positions.append(pos) + # Set door cooldown when exiting interior maps to avoid re-entry + prev = self.last_overworld_state.map_id + if prev in (37, 38, 40) and state.map_id == 0: + self.door_cooldown = 8 self.log( - f"MAP CHANGE | {self.last_overworld_state.map_id} -> {state.map_id} | " + f"MAP CHANGE | {prev} -> {state.map_id} | " f"Pos: ({state.x}, {state.y})" ) return @@ -414,7 +445,7 @@ def update_overworld_progress(self, state: OverworldState): if len(self.recent_positions) > 8: self.recent_positions.pop(0) - if self.stuck_turns in {2, 5, 10}: + if self.stuck_turns in {2, 5, 10, 20}: self.log( f"STUCK | Map: {state.map_id} | Pos: ({state.x}, {state.y}) | " f"Last move: {self.last_overworld_action} | Streak: {self.stuck_turns}" @@ -425,9 +456,68 @@ def choose_overworld_action(self, state: OverworldState) -> str: if state.text_box_active: return "a" - # After Oak escorts the player into the lab, stay in interaction mode - # until the scripted intro there finishes. + # After exiting a building, walk away from the door to avoid re-entry + if self.door_cooldown > 0: + self.door_cooldown -= 1 + if self.door_cooldown >= 6: + self.controller.wait(60) # let game scripts complete + return "a" # dismiss any dialogue + if self.door_cooldown >= 3: + return "down" # walk south away from door + return "left" # sidestep to avoid door on return north + + # In Oak's lab with no Pokemon: walk to Pokeball table and pick one. + # Oak stands near (5,2) blocking north. Pressing A near him loops + # his dialogue. Going too far south triggers "Don't go away!" + # Strategy: A to dismiss text, down 1 to dodge Oak, right, up to table. if state.map_id == 40 and state.party_count == 0: + lab_script = self.memory._read(0xD5F1) + if self.turn_count % 50 == 0: + self.log(f"LAB | script={lab_script} pos=({state.x},{state.y}) " + f"turn={self.turn_count}") + if self.turn_count % 200 == 0: + self.take_screenshot(f"lab_t{self.turn_count}", force=True) + + if not hasattr(self, '_lab_turns'): + self._lab_turns = 0 + self._lab_turns += 1 + + # Pokeball sprites are at (6,3), (7,3), (8,3) ON the table. + # Interact from y=4 facing UP, or y=2 facing DOWN. + # Simplest path: B(clear) → down to y=4 → right to x=6 → up+A + if not hasattr(self, '_lab_phase'): + self._lab_phase = 0 + + if self._lab_phase == 0: + # Dismiss Oak's text with B, then move south + if state.y >= 4: + self._lab_phase = 1 + self.log(f"LAB | phase 0→1 south at ({state.x},{state.y})") + return "right" + if self._lab_turns % 2 == 1: + return "b" + return "down" + + elif self._lab_phase == 1: + # Go east to Pokeball column (x=6 = Charmander) + if state.x >= 6: + self._lab_phase = 2 + self.log(f"LAB | phase 1→2 at pokeball column ({state.x},{state.y})") + return "up" # face the table + return "right" + + else: + # Phase 2: face up toward Pokeball at (6,3) and press A + # Alternate up (to face table) and A (to interact) + if self._lab_turns % 2 == 0: + return "up" + return "a" + + # In Oak's Lab with a Pokemon: alternate A/down to advance scripted + # sequence while moving south (away from bookshelf/table). + if state.map_id == 40 and state.party_count > 0: + if self.turn_count % 3 == 0: + return "down" return "a" direction = self.navigator.next_direction( @@ -490,11 +580,14 @@ def write_pokedex_entry(self): path.write_text("\n".join(lines)) self.log(f"POKEDEX | Wrote {path}") - def take_screenshot(self): + def take_screenshot(self, label: str = "", force: bool = False): """Save current frame as turn{N}.png.""" - if not self.screenshots or Image is None: + if not force and not self.screenshots: return - path = self.frames_dir / f"turn{self.turn_count}.png" + if Image is None: + return + suffix = f"_{label}" if label else "" + path = self.frames_dir / f"turn{self.turn_count}{suffix}.png" img = Image.fromarray(self.pyboy.screen.ndarray) img.save(path) self.log(f"SCREENSHOT | {path}") @@ -552,23 +645,81 @@ def run_overworld(self): self.collision_map.update(self.pyboy) except Exception: pass # game_wrapper may not be available in all contexts + + # Diagnostic: capture screen and collision data at key positions + if state.map_id == 37 and not hasattr(self, '_house_diag_done'): + self._house_diag_done = True + self.take_screenshot("house_1f", force=True) + self.log(f"DIAG | House 1F at ({state.x},{state.y}) collision map:") + self.log(self.collision_map.to_ascii()) + + if state.map_id == 0 and state.y <= 3 and state.party_count == 0: + # Log game state near the Oak trigger zone + wd730 = self.memory._read(0xD730) + wd74b = self.memory._read(0xD74B) + cur_script = self.memory._read(0xD625) + if self.turn_count % 5 == 0: + self.log( + f"DIAG | Pallet y={state.y} x={state.x} " + f"wd730=0x{wd730:02X} wd74b=0x{wd74b:02X} script={cur_script}" + ) + if not hasattr(self, '_pallet_diag_done'): + self._pallet_diag_done = True + self.take_screenshot("pallet_north", force=True) + + # At y<=1, Oak's PalletTownScript0 triggers. Stop movement and + # wait for Oak to walk to the player, then mash A through dialogue. + if state.y <= 1: + if not hasattr(self, '_oak_wait_done'): + self._oak_wait_done = True + self.log(f"OAK TRIGGER | At y={state.y} x={state.x}. Waiting for Oak script...") + self.take_screenshot("oak_trigger", force=True) + # Wait for Oak to walk from Route 1 to the player (~600 frames) + self.controller.wait(600) + # Oak's lab intro has multiple scripted walking + dialogue phases: + # 1. Oak escorts player to lab (walk script ~300 frames) + # 2. Oak talks about research (several text boxes) + # 3. Oak walks to Pokeball table (walk script ~200 frames) + # 4. Oak says "choose a Pokemon" (text boxes) + # Alternate mashing A and waiting for walk scripts. + for _ in range(4): + self.controller.mash_a(30, delay=30) + self.controller.wait(300) + s = self.memory.read_overworld_state() + wd730 = self.memory._read(0xD730) + self.log(f"OAK TRIGGER | After wait: map={s.map_id} ({s.x},{s.y}) " + f"party={s.party_count} wd730=0x{wd730:02X}") + self.take_screenshot("oak_after_wait", force=True) + action = self.choose_overworld_action(state) if action in {"up", "down", "left", "right"}: self.controller.move(action) + elif action == "b": + self.controller.press("b", hold_frames=20, release_frames=12) + self.controller.wait(24) else: self.controller.press("a", hold_frames=20, release_frames=12) self.controller.wait(24) - # Log position every 100 steps - if self.turn_count % 100 == 0: + # Log position every 50 steps (or every 10 on map 0 for debugging) + log_interval = 10 if state.map_id == 0 else 50 + if self.turn_count % log_interval == 0: + wp_info = "" + map_key = str(state.map_id) + if map_key in self.navigator.routes: + route = self.navigator.routes[map_key] + waypoints = route["waypoints"] if isinstance(route, dict) and "waypoints" in route else route + if self.navigator.current_waypoint < len(waypoints): + wp = waypoints[self.navigator.current_waypoint] + wp_info = f" | WP: {self.navigator.current_waypoint}→({wp['x']},{wp['y']})" self.log( f"OVERWORLD | Map: {state.map_id} | " f"Pos: ({state.x}, {state.y}) | " f"Badges: {state.badges} | " f"Party: {state.party_count} | " f"Action: {action} | " - f"Stuck: {self.stuck_turns}" + f"Stuck: {self.stuck_turns}{wp_info}" ) self.last_overworld_state = state @@ -592,6 +743,14 @@ def run(self, max_turns: int = 100_000): self.log("Intro complete. Entering game loop.") + # Diagnostic: capture game state right after intro + intro_state = self.memory.read_overworld_state() + self.take_screenshot("post_intro", force=True) + wd730 = self.memory._read(0xD730) + wd74b = self.memory._read(0xD74B) + self.log(f"DIAG | Post-intro: map={intro_state.map_id} pos=({intro_state.x},{intro_state.y}) " + f"party={intro_state.party_count} wd730=0x{wd730:02X} wd74b=0x{wd74b:02X}") + for _ in range(max_turns): battle = self.memory.read_battle_state() diff --git a/tests/test_agent.py b/tests/test_agent.py index aa79409..5c0c6b4 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -354,13 +354,18 @@ def test_add_direction_none_ignored(self): # -- _direction_toward_target -- - def test_direction_at_target_returns_cardinal(self): - """When at target, horizontal=None, vertical=None, but the cardinal - directions loop still fills ordered with all 4 directions.""" + def test_direction_at_target_returns_none(self): + """When at target with stuck < 8, no fallback directions are added.""" nav = Navigator({}) state = OverworldState(x=5, y=5) result = nav._direction_toward_target(state, 5, 5) - # ordered = [up, right, down, left] from the for loop on line 243 + assert result is None + + def test_direction_at_target_stuck_returns_cardinal(self): + """When at target but stuck >= 8, fallback directions are added.""" + nav = Navigator({}) + state = OverworldState(x=5, y=5) + result = nav._direction_toward_target(state, 5, 5, stuck_turns=8) assert result == "up" def test_direction_toward_target_empty_ordered(self): @@ -505,8 +510,13 @@ def test_next_direction_waypoint_reached_last_returns_none(self): def _make_agent(tmp_path, screenshots=False, routes=None, type_chart_data=None): """Build a PokemonAgent with all external I/O mocked.""" + from collections import defaultdict + mock_pb = MagicMock() - mock_pb.memory = MagicMock() + # Use a defaultdict(int) for pyboy.memory so that memory[addr] returns 0 + # instead of a MagicMock. This prevents TypeError when format strings like + # {val:02X} are used on memory read results. + mock_pb.memory = defaultdict(int) tc_path = tmp_path / "tc.json" if type_chart_data: @@ -735,7 +745,10 @@ def test_text_box_active_returns_a(self, tmp_path): def test_oaks_lab_no_party_returns_a(self, tmp_path): ag = _make_agent(tmp_path) state = OverworldState(map_id=40, party_count=0) - assert ag.choose_overworld_action(state) == "a" + with patch.object(agent, "Image", None): + result = ag.choose_overworld_action(state) + # Phase 0 of the lab strategy: dismiss text (b) or move south (down) + assert result in ("a", "b", "down", "right", "up") def test_oaks_lab_with_party_uses_navigator(self, tmp_path): ag = _make_agent(tmp_path) @@ -987,7 +1000,8 @@ def test_run_battle_then_overworld(self, tmp_path): ) ag.memory.read_overworld_state = MagicMock(return_value=overworld) - ag.run(max_turns=2) + with patch.object(agent, "Image", None): + ag.run(max_turns=2) assert ag.battles_won == 1 assert any("Battle ended" in e for e in ag.events) @@ -1001,7 +1015,8 @@ def test_run_overworld_only(self, tmp_path): ag.memory.read_battle_state = MagicMock(return_value=battle_none) ag.memory.read_overworld_state = MagicMock(return_value=overworld) - ag.run(max_turns=2) + with patch.object(agent, "Image", None): + ag.run(max_turns=2) assert ag.turn_count >= 2 assert any("Session complete" in e for e in ag.events) @@ -1043,7 +1058,8 @@ def test_run_battle_not_ended(self, tmp_path): ag.memory.read_battle_state = MagicMock(return_value=battle_active) ag.memory.read_overworld_state = MagicMock(return_value=overworld) - ag.run(max_turns=1) + with patch.object(agent, "Image", None): + ag.run(max_turns=1) assert ag.battles_won == 0 @@ -1057,7 +1073,8 @@ def test_run_pyboy_stop_permission_error(self, tmp_path): ag.pyboy.stop.side_effect = PermissionError("read-only mount") # Should not raise - ag.run(max_turns=1) + with patch.object(agent, "Image", None): + ag.run(max_turns=1) assert any("Session complete" in e for e in ag.events) def test_run_writes_pokedex_entry(self, tmp_path): @@ -1068,7 +1085,8 @@ def test_run_writes_pokedex_entry(self, tmp_path): ag.memory.read_battle_state = MagicMock(return_value=battle_none) ag.memory.read_overworld_state = MagicMock(return_value=overworld) - ag.run(max_turns=1) + with patch.object(agent, "Image", None): + ag.run(max_turns=1) logs = list(ag.pokedex_dir.glob("log*.md")) assert len(logs) == 1 @@ -1158,8 +1176,21 @@ def test_dunder_main_calls_main(self, tmp_path): pokedex_dir = tmp_path / "pokedex" pokedex_dir.mkdir(parents=True, exist_ok=True) + # Remove PIL so the re-imported agent sets Image = None, + # avoiding Image.fromarray() on a MagicMock screen.ndarray. + saved_pil = sys.modules.pop("PIL", None) + saved_pil_image = sys.modules.pop("PIL.Image", None) + import builtins + original_import = builtins.__import__ + + def fail_pil(name, *args, **kwargs): + if name in ("PIL", "PIL.Image"): + raise ImportError("no PIL for test") + return original_import(name, *args, **kwargs) + # Use --max-turns 0 so the main loop body never executes. - with patch("sys.argv", ["agent.py", str(rom), "--max-turns", "0"]): + with patch("sys.argv", ["agent.py", str(rom), "--max-turns", "0"]), \ + patch.object(builtins, "__import__", side_effect=fail_pil): saved_pyboy = sys.modules.get("pyboy") sys.modules["pyboy"] = mock_pyboy_mod try: @@ -1172,6 +1203,10 @@ def test_dunder_main_calls_main(self, tmp_path): sys.modules["pyboy"] = saved_pyboy else: sys.modules.pop("pyboy", None) + if saved_pil is not None: + sys.modules["PIL"] = saved_pil + if saved_pil_image is not None: + sys.modules["PIL.Image"] = saved_pil_image # If we got here without error, line 600 (main()) was executed. mock_pyboy_mod.PyBoy.assert_called_once() @@ -1195,7 +1230,8 @@ def test_routes_path_is_path(self): def test_early_game_targets_has_keys(self): assert 38 in EARLY_GAME_TARGETS assert 37 in EARLY_GAME_TARGETS - assert 0 in EARLY_GAME_TARGETS + # Map 0 (Pallet Town) uses waypoints instead of EARLY_GAME_TARGETS + assert 0 not in EARLY_GAME_TARGETS def test_move_data_has_entries(self): assert 0x01 in MOVE_DATA @@ -1292,12 +1328,12 @@ def test_with_collision_grid_for_early_game_targets(self): def test_with_collision_grid_early_game_offscreen_falls_back(self): """Early game target offscreen falls back to _direction_toward_target.""" nav = Navigator({}) - # Map 0 = Pallet Town, target (5, 1) - # Player at (5, 20) -> screen target = (4 + (1-20), 4 + (5-5)) = (-15, 4) -> offscreen - state = OverworldState(map_id=0, x=5, y=20) + # Map 38 = Red's bedroom, target (7, 1) + # Player at (3, 20) -> screen target = (4 + (1-20), 4 + (7-3)) = (-15, 8) -> offscreen + state = OverworldState(map_id=38, x=3, y=20) grid = self._open_grid() result = nav.next_direction(state, collision_grid=grid) - assert result == "up" # y-axis preference for Pallet Town is "x" but y needed + assert result == "right" # axis "x" for Red's bedroom, x=3 -> x=7 def test_with_collision_grid_early_game_astar_failure_falls_back(self): """Early game A* failure falls back to _direction_toward_target.""" @@ -1411,3 +1447,480 @@ def test_choose_overworld_action_passes_collision_grid(self, tmp_path): stuck_turns=ag.stuck_turns, collision_grid=ag.collision_map.grid, ) + + +# =================================================================== +# Navigator._try_astar -- lines 284, 289 +# =================================================================== + + +class TestTryAstar: + """Cover _try_astar returning first A* direction (284) and None (289).""" + + def _open_grid(self): + return [[1] * 10 for _ in range(9)] + + def test_astar_returns_first_direction(self): + """Line 284: A* succeeds and returns the first direction.""" + nav = Navigator({}) + state = OverworldState(map_id=10, x=5, y=5) + # Target at (6, 5) -> screen (4, 5), player at screen (4, 4) + result = nav._try_astar(state, 6, 5, self._open_grid()) + assert result == "right" + + def test_astar_out_of_bounds_returns_none(self): + """Line 289: screen target out of bounds -> returns None.""" + nav = Navigator({}) + state = OverworldState(map_id=10, x=5, y=5) + # Target at (20, 5) -> screen col = 4 + 15 = 19, out of 10-wide grid + result = nav._try_astar(state, 20, 5, self._open_grid()) + assert result is None + + def test_astar_no_path_returns_none(self): + """Line 289: target in bounds but no path found -> returns None.""" + nav = Navigator({}) + state = OverworldState(map_id=10, x=5, y=5) + # All walls, no path possible + grid = [[0] * 10 for _ in range(9)] + grid[4][4] = 1 # only player cell walkable + result = nav._try_astar(state, 6, 5, grid) + assert result is None + + +# =================================================================== +# Navigator.next_direction -- early game target nulled by party (284) +# and at_target return (289), skip waypoint when stuck (322-323) +# =================================================================== + + +class TestNextDirectionUncoveredBranches: + """Cover lines 284, 289, 322-323 in next_direction.""" + + def test_map0_with_party_nulls_special_target(self): + """Line 284: map_id==0, party_count>0 sets special_target = None.""" + # Map 0 is NOT in EARLY_GAME_TARGETS so this is a no-op path, + # but the assignment still executes if map_id == 0 and party_count > 0. + routes = {"0": [{"x": 8, "y": 10}]} + nav = Navigator(routes) + state = OverworldState(map_id=0, x=5, y=5, party_count=1) + result = nav.next_direction(state) + # Falls through to waypoint routing since special_target was None/nulled + assert result is not None + + def test_at_early_game_target_returns_at_target_hint(self): + """Line 289: at target returns at_target hint (default 'down').""" + nav = Navigator({}) + # Map 38 target is (7, 1) with axis "x" — no at_target key => default "down" + state = OverworldState(map_id=38, x=7, y=1) + result = nav.next_direction(state) + assert result == "down" + + def test_skip_waypoint_when_stuck_and_close(self): + """Lines 322-323: stuck_turns>=8, dist<=3, skip waypoint.""" + routes = {"10": [{"x": 5, "y": 6}, {"x": 10, "y": 10}]} + nav = Navigator(routes) + # Player at (5, 5), first waypoint at (5, 6) -> dist=1 + # stuck_turns=8, dist<=3, not last waypoint -> skip + state = OverworldState(map_id=10, x=5, y=5) + result = nav.next_direction(state, stuck_turns=8) + # Should have skipped first waypoint and now be navigating to second + assert nav.current_waypoint == 1 + assert result is not None + + +# =================================================================== +# update_overworld_progress -- lines 426, 452 +# =================================================================== + + +class TestUpdateOverworldProgressUncovered: + """Cover door cooldown on interior exit (426) and Viridian milestone (452).""" + + def test_door_cooldown_on_interior_exit_map37(self, tmp_path): + """Line 426: exiting map 37 to map 0 sets door_cooldown = 8.""" + ag = _make_agent(tmp_path) + ag.last_overworld_state = OverworldState(map_id=37, x=3, y=9) + state = OverworldState(map_id=0, x=5, y=5) + ag.update_overworld_progress(state) + assert ag.door_cooldown == 8 + + def test_door_cooldown_on_interior_exit_map38(self, tmp_path): + """Line 426: exiting map 38 to map 0 sets door_cooldown = 8.""" + ag = _make_agent(tmp_path) + ag.last_overworld_state = OverworldState(map_id=38, x=7, y=1) + state = OverworldState(map_id=0, x=5, y=5) + ag.update_overworld_progress(state) + assert ag.door_cooldown == 8 + + def test_door_cooldown_on_interior_exit_map40(self, tmp_path): + """Line 426: exiting map 40 to map 0 sets door_cooldown = 8.""" + ag = _make_agent(tmp_path) + ag.last_overworld_state = OverworldState(map_id=40, x=5, y=5) + state = OverworldState(map_id=0, x=5, y=5) + ag.update_overworld_progress(state) + assert ag.door_cooldown == 8 + + def test_no_door_cooldown_on_non_interior_exit(self, tmp_path): + """Line 426 not hit: exiting map 12 to map 0 does not set cooldown.""" + ag = _make_agent(tmp_path) + ag.last_overworld_state = OverworldState(map_id=12, x=5, y=5) + state = OverworldState(map_id=0, x=5, y=5) + ag.update_overworld_progress(state) + assert ag.door_cooldown == 0 + + def test_viridian_city_milestone_fires_on_first_visit(self, tmp_path): + """Line 415: milestone log fires when map 1 is visited for the first time.""" + ag = _make_agent(tmp_path) + ag.last_overworld_state = OverworldState(map_id=0, x=5, y=0) + ag.maps_visited = {0} + state = OverworldState(map_id=1, x=5, y=35) + ag.update_overworld_progress(state) + assert any("MILESTONE" in e for e in ag.events) + + +# =================================================================== +# choose_overworld_action -- door cooldown phases (461-467) +# =================================================================== + + +class TestDoorCooldownPhases: + """Cover lines 461-467: door cooldown phases.""" + + def test_door_cooldown_high_returns_a(self, tmp_path): + """Lines 462-464: cooldown >= 6 -> wait + return 'a'.""" + ag = _make_agent(tmp_path) + ag.door_cooldown = 7 # will be decremented to 6, >= 6 + ag.controller = MagicMock() + state = OverworldState(map_id=0, x=5, y=5) + result = ag.choose_overworld_action(state) + assert result == "a" + ag.controller.wait.assert_called_once_with(60) + + def test_door_cooldown_mid_returns_down(self, tmp_path): + """Lines 465-466: cooldown >= 3 -> return 'down'.""" + ag = _make_agent(tmp_path) + ag.door_cooldown = 4 # decremented to 3, >= 3 + state = OverworldState(map_id=0, x=5, y=5) + result = ag.choose_overworld_action(state) + assert result == "down" + + def test_door_cooldown_low_returns_left(self, tmp_path): + """Line 467: cooldown < 3 -> return 'left'.""" + ag = _make_agent(tmp_path) + ag.door_cooldown = 2 # decremented to 1, < 3 + state = OverworldState(map_id=0, x=5, y=5) + result = ag.choose_overworld_action(state) + assert result == "left" + + +# =================================================================== +# choose_overworld_action -- Oak's Lab phases (494-514, 521) +# =================================================================== + + +class TestOaksLabPhases: + """Cover lab phases 0->1->2 with no Pokemon and lab with Pokemon.""" + + def test_lab_phase0_y_ge_4_transitions_to_phase1(self, tmp_path): + """Lines 493-496: phase 0, y>=4 -> transition to phase 1, return 'right'.""" + ag = _make_agent(tmp_path) + with patch.object(agent, "Image", None): + state = OverworldState(map_id=40, party_count=0, x=3, y=4) + result = ag.choose_overworld_action(state) + assert result == "right" + assert ag._lab_phase == 1 + assert any("phase 0" in e for e in ag.events) + + def test_lab_phase0_odd_turn_returns_b(self, tmp_path): + """Lines 497-498: phase 0, _lab_turns odd -> return 'b'.""" + ag = _make_agent(tmp_path) + ag._lab_turns = 0 # will be incremented to 1 (odd) + ag._lab_phase = 0 + with patch.object(agent, "Image", None): + state = OverworldState(map_id=40, party_count=0, x=3, y=2) + result = ag.choose_overworld_action(state) + assert result == "b" + + def test_lab_phase0_even_turn_returns_down(self, tmp_path): + """Lines 498-499: phase 0, _lab_turns even -> return 'down'.""" + ag = _make_agent(tmp_path) + ag._lab_turns = 1 # will be incremented to 2 (even) + ag._lab_phase = 0 + with patch.object(agent, "Image", None): + state = OverworldState(map_id=40, party_count=0, x=3, y=2) + result = ag.choose_overworld_action(state) + assert result == "down" + + def test_lab_phase1_x_ge_6_transitions_to_phase2(self, tmp_path): + """Lines 503-506: phase 1, x>=6 -> transition to phase 2, return 'up'.""" + ag = _make_agent(tmp_path) + ag._lab_phase = 1 + ag._lab_turns = 0 + with patch.object(agent, "Image", None): + state = OverworldState(map_id=40, party_count=0, x=6, y=4) + result = ag.choose_overworld_action(state) + assert result == "up" + assert ag._lab_phase == 2 + assert any("phase 1" in e for e in ag.events) + + def test_lab_phase1_x_lt_6_returns_right(self, tmp_path): + """Line 507: phase 1, x<6 -> return 'right'.""" + ag = _make_agent(tmp_path) + ag._lab_phase = 1 + ag._lab_turns = 0 + with patch.object(agent, "Image", None): + state = OverworldState(map_id=40, party_count=0, x=4, y=4) + result = ag.choose_overworld_action(state) + assert result == "right" + + def test_lab_phase2_even_turn_returns_up(self, tmp_path): + """Lines 512-513: phase 2, _lab_turns even -> return 'up'.""" + ag = _make_agent(tmp_path) + ag._lab_phase = 2 + ag._lab_turns = 1 # incremented to 2 (even) + with patch.object(agent, "Image", None): + state = OverworldState(map_id=40, party_count=0, x=6, y=4) + result = ag.choose_overworld_action(state) + assert result == "up" + + def test_lab_phase2_odd_turn_returns_a(self, tmp_path): + """Line 514: phase 2, _lab_turns odd -> return 'a'.""" + ag = _make_agent(tmp_path) + ag._lab_phase = 2 + ag._lab_turns = 0 # incremented to 1 (odd) + with patch.object(agent, "Image", None): + state = OverworldState(map_id=40, party_count=0, x=6, y=4) + result = ag.choose_overworld_action(state) + assert result == "a" + + def test_lab_with_pokemon_turn_div3_returns_down(self, tmp_path): + """Line 519-520: map 40, party>0, turn_count % 3 == 0 -> 'down'.""" + ag = _make_agent(tmp_path) + ag.turn_count = 9 # 9 % 3 == 0 + state = OverworldState(map_id=40, party_count=1, x=5, y=5) + result = ag.choose_overworld_action(state) + assert result == "down" + + def test_lab_with_pokemon_turn_not_div3_returns_a(self, tmp_path): + """Line 521: map 40, party>0, turn_count % 3 != 0 -> 'a'.""" + ag = _make_agent(tmp_path) + ag.turn_count = 10 # 10 % 3 == 1 + state = OverworldState(map_id=40, party_count=1, x=5, y=5) + result = ag.choose_overworld_action(state) + assert result == "a" + + +# =================================================================== +# run_overworld -- House 1F diagnostic (651-654) +# =================================================================== + + +class TestRunOverworldHouseDiag: + """Cover lines 651-654: House 1F diagnostic on first visit.""" + + def test_house_1f_diagnostic_on_first_visit(self, tmp_path): + """Lines 651-654: map 37, first visit -> screenshot + collision log.""" + ag = _make_agent(tmp_path) + state = OverworldState(map_id=37, x=3, y=5) + ag.memory.read_overworld_state = MagicMock(return_value=state) + ag.controller = MagicMock() + ag.collision_map = MagicMock() + ag.collision_map.grid = [[1] * 10 for _ in range(9)] + ag.collision_map.to_ascii.return_value = "...\n...\n" + ag.turn_count = 1 + + with patch.object(agent, "Image", None): + ag.run_overworld() + + assert hasattr(ag, '_house_diag_done') + assert ag._house_diag_done is True + assert any("DIAG | House 1F" in e for e in ag.events) + + def test_house_1f_diagnostic_only_once(self, tmp_path): + """Lines 651-654: second visit to map 37 does not re-trigger.""" + ag = _make_agent(tmp_path) + ag._house_diag_done = True # already done + state = OverworldState(map_id=37, x=3, y=5) + ag.memory.read_overworld_state = MagicMock(return_value=state) + ag.controller = MagicMock() + ag.collision_map = MagicMock() + ag.collision_map.grid = [[1] * 10 for _ in range(9)] + ag.turn_count = 1 + + ag.run_overworld() + + # No DIAG event since _house_diag_done was already True + assert not any("DIAG | House 1F" in e for e in ag.events) + + +# =================================================================== +# run_overworld -- Pallet Town Oak trigger (658-692) +# =================================================================== + + +class TestRunOverworldOakTrigger: + """Cover lines 658-692: Pallet Town Oak trigger diagnostic.""" + + def test_pallet_diag_at_y_le_3_no_party(self, tmp_path): + """Lines 658-668: map 0, y<=3, no party -> diagnostic log + screenshot.""" + ag = _make_agent(tmp_path) + state = OverworldState(map_id=0, x=5, y=3, party_count=0) + ag.memory.read_overworld_state = MagicMock(return_value=state) + ag.controller = MagicMock() + ag.collision_map = MagicMock() + ag.collision_map.grid = [[1] * 10 for _ in range(9)] + ag.turn_count = 5 # 5 % 5 == 0 -> triggers log + + with patch.object(agent, "Image", None): + ag.run_overworld() + + assert hasattr(ag, '_pallet_diag_done') + assert ag._pallet_diag_done is True + assert any("DIAG | Pallet" in e for e in ag.events) + + def test_oak_wait_at_y_le_1(self, tmp_path): + """Lines 672-692: map 0, y<=1, no party -> Oak wait sequence.""" + ag = _make_agent(tmp_path) + state = OverworldState(map_id=0, x=5, y=1, party_count=0) + post_wait_state = OverworldState(map_id=40, x=5, y=3, party_count=0) + ag.memory.read_overworld_state = MagicMock(side_effect=[state, post_wait_state]) + ag.controller = MagicMock() + ag.collision_map = MagicMock() + ag.collision_map.grid = [[1] * 10 for _ in range(9)] + ag.turn_count = 5 # divisible by 5 + + with patch.object(agent, "Image", None): + ag.run_overworld() + + assert hasattr(ag, '_oak_wait_done') + assert ag._oak_wait_done is True + assert any("OAK TRIGGER" in e for e in ag.events) + # Should have called wait(600) for Oak walk + ag.controller.wait.assert_any_call(600) + # Should have called mash_a 4 times + assert ag.controller.mash_a.call_count == 4 + + def test_oak_wait_only_once(self, tmp_path): + """Lines 673: _oak_wait_done already set -> skip Oak sequence.""" + ag = _make_agent(tmp_path) + ag._oak_wait_done = True + ag._pallet_diag_done = True + state = OverworldState(map_id=0, x=5, y=1, party_count=0) + ag.memory.read_overworld_state = MagicMock(return_value=state) + ag.controller = MagicMock() + ag.collision_map = MagicMock() + ag.collision_map.grid = [[1] * 10 for _ in range(9)] + ag.turn_count = 5 + + with patch.object(agent, "Image", None): + ag.run_overworld() + + # No wait(600) call since _oak_wait_done was already True + calls = [c for c in ag.controller.wait.call_args_list if c == call(600)] + assert len(calls) == 0 + + def test_pallet_diag_no_log_at_non_5_turn(self, tmp_path): + """Line 661: turn_count % 5 != 0 -> no DIAG log.""" + ag = _make_agent(tmp_path) + state = OverworldState(map_id=0, x=5, y=3, party_count=0) + ag.memory.read_overworld_state = MagicMock(return_value=state) + ag.controller = MagicMock() + ag.collision_map = MagicMock() + ag.collision_map.grid = [[1] * 10 for _ in range(9)] + ag.turn_count = 7 # 7 % 5 != 0 + + with patch.object(agent, "Image", None): + ag.run_overworld() + + # _pallet_diag_done still set (screenshot unconditional), but no DIAG log + diag_logs = [e for e in ag.events if "DIAG | Pallet" in e] + assert len(diag_logs) == 0 + + +# =================================================================== +# run_overworld -- B-button dispatch (699-700) +# =================================================================== + + +class TestRunOverworldBButton: + """Cover lines 699-700: action == 'b' -> press B.""" + + def test_b_action_presses_b(self, tmp_path): + """Lines 698-700: action='b' dispatches press('b', ...).""" + ag = _make_agent(tmp_path) + state = OverworldState(map_id=99, x=5, y=5) + ag.memory.read_overworld_state = MagicMock(return_value=state) + ag.choose_overworld_action = MagicMock(return_value="b") + ag.controller = MagicMock() + ag.turn_count = 1 + + ag.run_overworld() + + ag.controller.press.assert_called_once_with("b", hold_frames=20, release_frames=12) + ag.controller.wait.assert_called_once_with(24) + assert ag.last_overworld_action == "b" + + +# =================================================================== +# run_overworld -- Waypoint info logging (711-715) +# =================================================================== + + +class TestRunOverworldWaypointLogging: + """Cover lines 711-715: waypoint info in OVERWORLD log.""" + + def test_waypoint_info_in_log(self, tmp_path): + """Lines 710-715: route exists, waypoint available -> WP info in log.""" + routes = {"12": {"waypoints": [{"x": 8, "y": 10}, {"x": 8, "y": 4}]}} + ag = _make_agent(tmp_path, routes=routes) + state = OverworldState(map_id=12, x=5, y=10, badges=0, party_count=1) + ag.memory.read_overworld_state = MagicMock(return_value=state) + ag.choose_overworld_action = MagicMock(return_value="down") + ag.controller = MagicMock() + ag.turn_count = 50 # 50 % 50 == 0 -> logs + # Set navigator map state + ag.navigator.current_map = "12" + ag.navigator.current_waypoint = 0 + + ag.run_overworld() + + overworld_logs = [e for e in ag.events if "OVERWORLD" in e] + assert len(overworld_logs) == 1 + assert "WP: 0" in overworld_logs[0] + assert "(8,10)" in overworld_logs[0] + + def test_waypoint_info_list_route_format(self, tmp_path): + """Lines 711-715: route as plain list (not dict with 'waypoints').""" + routes = {"12": [{"x": 8, "y": 10}]} + ag = _make_agent(tmp_path, routes=routes) + state = OverworldState(map_id=12, x=5, y=10, badges=0, party_count=1) + ag.memory.read_overworld_state = MagicMock(return_value=state) + ag.choose_overworld_action = MagicMock(return_value="down") + ag.controller = MagicMock() + ag.turn_count = 50 + ag.navigator.current_map = "12" + ag.navigator.current_waypoint = 0 + + ag.run_overworld() + + overworld_logs = [e for e in ag.events if "OVERWORLD" in e] + assert len(overworld_logs) == 1 + assert "WP:" in overworld_logs[0] + + def test_no_waypoint_info_when_past_all_waypoints(self, tmp_path): + """Lines 713 guard: current_waypoint >= len -> no WP info.""" + routes = {"12": [{"x": 8, "y": 10}]} + ag = _make_agent(tmp_path, routes=routes) + state = OverworldState(map_id=12, x=5, y=10, badges=0, party_count=1) + ag.memory.read_overworld_state = MagicMock(return_value=state) + ag.choose_overworld_action = MagicMock(return_value="down") + ag.controller = MagicMock() + ag.turn_count = 50 + ag.navigator.current_map = "12" + ag.navigator.current_waypoint = 5 # past all waypoints + + ag.run_overworld() + + overworld_logs = [e for e in ag.events if "OVERWORLD" in e] + assert len(overworld_logs) == 1 + assert "WP:" not in overworld_logs[0]