diff --git a/CTF/backend/__pycache__/mypolicy.cpython-312.pyc b/CTF/backend/__pycache__/mypolicy.cpython-312.pyc new file mode 100644 index 0000000..913dfd7 Binary files /dev/null and b/CTF/backend/__pycache__/mypolicy.cpython-312.pyc differ diff --git a/CTF/backend/mypolicy.py b/CTF/backend/mypolicy.py index 91df086..d33c5e1 100644 --- a/CTF/backend/mypolicy.py +++ b/CTF/backend/mypolicy.py @@ -2,41 +2,139 @@ import asyncio import random from lib.game_engine import GameMap, run_game_server import threading +import collections # 1. Initialize the global world model -world = GameMap(show_gap_in_msec=1000.0) +world = GameMap(show_gap_in_msec=10.0) lock = threading.Lock() last_updated_time = -1 update_threshold = 100 player_to_flag_assignments = {} +my_side_is_left = None +class Map: + def __init__(self): + self.width = world.width + self.height = world.height + self.grid = [0] * (self.width * self.height) + self.edge = [[] for _ in range(self.width * self.height)] + self.in_safe_zone = None + def convert_pos_to_index(self, x, y): + return y * self.width + x + def update(self,posx,posy): + self.edge = [[] for _ in range(self.width * self.height)] + self.grid = [0] * (self.width * self.height) + walls = world.walls + for wall in walls: + x, y = wall["posX"], wall["posY"] + idx = self.convert_pos_to_index(x, y) + self.grid[idx] = 1 # Mark wall positions + + self.in_safe_zone = world.is_on_left((posx,posy)) == my_side_is_left + + enemy_players_with_flags = world.list_players(mine=False, inPrison=False, hasFlag=None) + ally_players = world.list_players(mine=True, inPrison=False, hasFlag=None) + my_pos = self.convert_pos_to_index(posx, posy) + for enemy in enemy_players_with_flags: + x, y = enemy["posX"], enemy["posY"] + idx = self.convert_pos_to_index(x, y) + self.grid[idx] = 2 # Mark enemy players with flags as obstacles + for ally in ally_players: + x, y = ally["posX"], ally["posY"] + idx = self.convert_pos_to_index(x, y) + self.grid[idx] = 3 # Mark ally players as free space + for y in range(self.height): + for x in range(self.width): + idx = self.convert_pos_to_index(x, y) + if self.grid[idx] in (1, 2): + continue + for dx, dy in [(0, 1), (0, -1), (1, 0), (-1, 0)]: + nx, ny = x + dx, y + dy + if 0 <= nx < self.width and 0 <= ny < self.height: + n_idx = self.convert_pos_to_index(nx, ny) + if self.in_safe_zone: + if (self.grid[n_idx] not in (1, 2, 3)) or n_idx == my_pos: + self.edge[idx].append(n_idx) + else: + if (self.grid[n_idx] not in (1, 3)) or n_idx == my_pos: + self.edge[idx].append(n_idx) + def guideance(self, posx_start, posy_start, posx_end, posy_end): + """ + Compute the shortest path from the start position to the end position + using BFS (equivalent to Dijkstra with unit edge weight) on the pre‑built + adjacency list `self.edge`. Returns the first move direction via + `world.get_direction`. If no path exists or inputs are invalid, returns an + empty string. + """ + self.update(posx_start,posy_start) + # ---- 参数合法性检查 ---- + if not (0 <= posx_start < self.width and 0 <= posy_start < self.height): + return "" + if not (0 <= posx_end < self.width and 0 <= posy_end < self.height): + return "" + src_idx = self.convert_pos_to_index(posx_start, posy_start) + dst_idx = self.convert_pos_to_index(posx_end, posy_end) + n = self.width * self.height + dist = [float('inf')] * n # 最短距离,初始为无穷大 + prev = [None] * n # 前驱节点,用于路径回溯 + dist[src_idx] = 0 + queue = collections.deque([src_idx]) + + while queue: + u = queue.popleft() + if u == dst_idx: + break + for v in self.edge[u]: + if dist[v] == float('inf'): + dist[v] = dist[u] + 1 + prev[v] = u + queue.append(v) + + # ---- 若不可达,返回空字符串 ---- + if dist[dst_idx] == float('inf'): + return "" + + # ---- 重建路径(逆序) ---- + path = [] + cur = dst_idx + while cur is not None: + path.append(cur) + cur = prev[cur] + path.reverse() # 现在是 [src, ..., dst] + + if len(path) < 2: + return "" + + # ---- 计算第一步坐标并返回方向 ---- + next_idx = path[1] + next_x = next_idx % (self.width + 1) + next_y = next_idx // (self.width + 1) + + return world.get_direction((posx_start, posy_start), (next_x, next_y)) + +myMap = Map() def start_game(req): - global player_to_flag_assignments + global player_to_flag_assignments,my_side_is_left world.init(req) print("Start Game!!") player_to_flag_assignments = {} print(f"Game Started! Side: {'Left' if world.is_on_left(list(world.my_team_target)[0]) else 'Right'}") + my_side_is_left = world.is_on_left(list(world.my_team_target)[0]) def plan_next_actions(req): if not world.update(req): return - - global player_to_flag_assignments - - # Render the map - # world.show(do_not_clear=False) + global player_to_flag_assignments,myMap,my_side_is_left my_players = world.list_players(mine=True, inPrison=False, hasFlag=None) opponents = world.list_players(mine=False, inPrison=False, hasFlag=None) enemy_flags = world.list_flags(mine=False, canPickup=True) my_targets = list(world.list_targets(mine=True)) + - # 2. Logic: Assign flags to players without flags - # We maintain the original logic of matching players to specific flag coordinates active_player_names = {p["name"] for p in my_players if not p["hasFlag"]} - # Cleanup assignments for players captured or flags already taken player_to_flag_assignments = { name: pos for name, pos in player_to_flag_assignments.items() if name in active_player_names @@ -51,7 +149,6 @@ def plan_next_actions(req): # 3. Plan moves for each player player_moves = {} - my_side_is_left = world.is_on_left(my_targets[0]) for p in my_players: curr_pos = (p["posX"], p["posY"]) @@ -67,13 +164,10 @@ def plan_next_actions(req): # Determine Obstacles: Avoid opponents if we are in enemy territory is_safe = world.is_on_left(curr_pos) == my_side_is_left blockers = [] if is_safe else [(o["posX"], o["posY"]) for o in opponents] - # Calculate Path - path = world.route_to(curr_pos, dest, extra_obstacles=blockers) - - if len(path) > 1: - move = world.get_direction(curr_pos, path[1]) - player_moves[p["name"]] = move + # path = world.route_to(curr_pos, dest, extra_obstacles=blockers) + + player_moves[p["name"]] = myMap.guideance(p["posX"],p["posY"],dest[0],dest[1]) return player_moves @@ -97,6 +191,5 @@ async def main(): print(f"Server Stopped: {e}") sys.exit(1) - if __name__ == "__main__": asyncio.run(main())