diff --git a/.DS_Store b/.DS_Store index eafd3eb..2069512 100644 Binary files a/.DS_Store and b/.DS_Store differ diff --git a/CTF/.DS_Store b/CTF/.DS_Store index f206f7d..652de56 100644 Binary files a/CTF/.DS_Store and b/CTF/.DS_Store differ diff --git a/CTF/backend/server.py b/CTF/backend/server.py index 77abd56..d5cc350 100644 --- a/CTF/backend/server.py +++ b/CTF/backend/server.py @@ -2,6 +2,8 @@ import asyncio from lib.game_engine import GameMap, run_game_server import threading import collections +import server0 +import server1 world = GameMap(show_gap_in_msec=1.0) lock = threading.Lock() @@ -179,15 +181,23 @@ class Map: chosen_enemy = pos return chosen_enemy,min_length myMap = Map() +rounds = 0 def start_game(req): - global player_to_flag_assign,my_side_is_left + server1.start_game(req) + server0.start_game(req) world.init(req) - print("Start Game!!") - player_to_flag_assign = {} - print(f"Game Started! Side: {'Left' if world.is_on_left(list(world.my_team_target)[0]) else 'Right'}") - my_side_is_left = world.is_on_left(list(world.my_team_target)[0]) + global rounds + rounds = 0 + def plan_next_actions(req): + global rounds + rounds+=1 + # if rounds < 50: + # return server0.plan_next_actions(req) + # else: + # return server1.plan_next_actions(req) + # return server0.plan_next_actions(req) if not world.update(req): return global player_to_flag_assign,myMap,my_side_is_left @@ -198,6 +208,17 @@ def plan_next_actions(req): enemy_flags = world.list_flags(mine=False, canPickup=True) my_flags = world.list_flags(mine=True, canPickup=True) my_targets = list(world.list_targets(mine=True)) + + my_in_prison = len(world.list_players(mine=True, inPrison=True, hasFlag=None)) + op_in_prison = len(world.list_players(False,True,None)) + if my_in_prison <= 0 and op_in_prison <= 0: + return server0.plan_next_actions(req) + elif my_in_prison > op_in_prison: + return server1.plan_next_actions(req) + else: + return server0.plan_next_actions(req) + + active_player_names = {p["name"] for p in my_players if not p["hasFlag"]} flags_list = [] regard_list = [] @@ -242,7 +263,6 @@ def plan_next_actions(req): f,lentime = myMap.closest_in_range(p["posX"],p["posY"],regard_list) if f is not None and lentime<14: dest = (f[0],f[1]) - regard_list.remove((f[0],f[1])) else : f,lentime = myMap.closest_in_range(p["posX"],p["posY"],pretend_list) if f is not None and lentime<8: @@ -252,7 +272,6 @@ def plan_next_actions(req): f,temp = myMap.closest_in_range(p["posX"],p["posY"],regard_list) if f is not None: dest = (f[0],f[1]) - regard_list.remove((f[0],f[1])) else: f = myMap.closest(p["posX"],p["posY"],pretend_list) if f is not None: @@ -265,15 +284,14 @@ def plan_next_actions(req): protect_list.remove((f[0],f[1])) else: continue - if dest is not None: - player_moves[p["name"]] = myMap.guideance(p["posX"],p["posY"],dest[0],dest[1]) - else: - player_moves[p["name"]] = myMap.guideance(p["posX"],p["posY"],p["posX"],p["posY"]) + player_moves[p["name"]] = myMap.guideance(p["posX"],p["posY"],dest[0],dest[1]) return player_moves def game_over(req): - print("Game Over!") - world.show(force=True) + server1.game_over(req) + server0.game_over(req) + # print("Game Over!") + # world.show(force=True) async def main(): import sys diff --git a/CTF/backend/server0.py b/CTF/backend/server0.py new file mode 100644 index 0000000..9fcb851 --- /dev/null +++ b/CTF/backend/server0.py @@ -0,0 +1,1147 @@ +import asyncio +import random +import heapq +import math +from lib.game_engine import GameMap, run_game_server +import threading +from collections import defaultdict + +# 全局世界模型初始化 +world = GameMap(show_gap_in_msec=1.0) +lock = threading.Lock() + +# 游戏状态管理 +player_roles = {} # 玩家角色分配 {'player_name': 'attacker'/'defender'} +flag_assignments = {} # flag分配 {'player_name': (x, y)} +attack_tasks = {} # 攻击任务状态 {'player_name': 'attacking'/'returning'/'hiding'} +defender_positions = {} # 防守位置分配 {'player_name': (x, y)} + + +def start_game(req): + """游戏开始初始化""" + global player_roles, flag_assignments, attack_tasks, defender_positions + world.init(req) + print("Start Game!!") + + # 重置所有全局状态 + player_roles = {} + flag_assignments = {} + attack_tasks = {} + defender_positions = {} + + print(f"Game Started! Side: {'Left' if world.is_on_left(list(world.my_team_target)[0]) else 'Right'}") + + +def a_star(start, goal, dangerous_positions=None, hard_obstacles=None, opponents=None): + """ + 基于权重的寻路算法,区分软限制和硬限制 + 危险位置:软限制(权重按距离opponent远近递增) + 对手位置和障碍物:硬限制(完全不能通过) + """ + if start == goal: + return [start] + + # 硬限制:墙壁、对手位置等完全不能通过的位置 + hard_blocked = set(world.walls) + if hard_obstacles: + hard_blocked.update(hard_obstacles) + + # 软限制:危险位置,权重按距离opponent远近递增 + dangerous_set = set(dangerous_positions) if dangerous_positions else set() + + # 获取opponents位置列表用于距离计算 + opponent_positions = [] + if opponents: + opponent_positions = [(o["posX"], o["posY"]) if isinstance(o, dict) else o for o in opponents] + elif hard_obstacles: + opponent_positions = list(hard_obstacles) + + # 4个方向移动:上下左右 + directions = [(0, -1), (0, 1), (-1, 0), (1, 0)] # 上, 下, 左, 右 + + def is_valid(pos): + """检查位置是否有效(不在在硬限制区域)""" + x, y = pos + return (0 <= x < world.width and 0 <= y < world.height and + pos not in hard_blocked) + + def get_move_cost(pos): + """获取移动到某位置的代价,危险区域权重按十字型阶梯递增,最小100""" + if pos in dangerous_set and opponent_positions: + # 计算到最近opponent的距离 + min_distance_to_opponent = min( + calculate_manhattan_distance(pos, opp_pos) + for opp_pos in opponent_positions + ) + # 十字型阶梯权重:以对手为中心,每层距离对应一个权重阶梯,最小100 + if min_distance_to_opponent == 0: + return 50000 # 对手位置:50000 + elif min_distance_to_opponent == 1: + return 10000 # 距离1:10000 + elif min_distance_to_opponent == 2: + return 5000 # 距离2:5000 + elif min_distance_to_opponent == 3: + return 2000 # 距离3:2000 + elif min_distance_to_opponent == 4: + return 1000 # 距离4:1000 + elif min_distance_to_opponent == 5: + return 500 # 距离5:500 + elif min_distance_to_opponent == 6: + return 300 # 距离6:300 + else: + return 100 # 距离7+:最小100 + elif pos in dangerous_set: + return 100 # 默认危险位置最小权重100 + + if is_in_my_territory(pos): + return 1 + else: + return 5 + # return 1 # 正常代价 + + def get_manhattan_distance(pos1, pos2): + """计算曼哈顿距离作为启发式函数""" + return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1]) + + # A*搜索主循环 + open_set = [] + heapq.heappush(open_set, (0, start)) # (f_score, pos) + came_from = {} + g_score = {start: 0} + f_score = {start: get_manhattan_distance(start, goal)} + closed_set = set() + + while open_set: + current_f, current_pos = heapq.heappop(open_set) + + if current_pos in closed_set: + continue + closed_set.add(current_pos) + + if current_pos == goal: + # 重构路径 + path = [] + pos = current_pos + while pos in came_from: + path.append(pos) + pos = came_from[pos] + path.append(start) + return path[::-1] + + # 探索所有方向 + for direction in directions: + next_pos = (current_pos[0] + direction[0], current_pos[1] + direction[1]) + + if is_valid(next_pos) and next_pos not in closed_set: + # 计算移动到下一位置的代价 + move_cost = get_move_cost(next_pos) + tentative_g = g_score[current_pos] + move_cost + + if next_pos not in g_score or tentative_g < g_score[next_pos]: + came_from[next_pos] = current_pos + g_score[next_pos] = tentative_g + f_score[next_pos] = tentative_g + get_manhattan_distance(next_pos, goal) + heapq.heappush(open_set, (f_score[next_pos], next_pos)) + + return [] # 无路径 + + +def calculate_manhattan_distance(pos1, pos2): + """计算曼哈顿距离""" + return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1]) + + +def is_in_my_territory(pos): + """检查位置是否在我方领域""" + my_targets = list(world.list_targets(mine=True)) + if not my_targets: + return True + my_side_is_left = world.is_on_left(my_targets[0]) + return world.is_on_left(pos) == my_side_is_left + + +def is_in_enemy_territory(pos): + """检查位置是否在敌方营地(敌方半场)""" + return not is_in_my_territory(pos) + + +def get_midline_x(): + """获取地图中线X坐标(独立函数,避免重复计算)""" + return world.width // 2 + + +def is_about_to_cross_midline(current_pos, next_pos): + """判断玩家是否即将越过中线(从己方半场进入敌方半场)""" + # 获取我方半场标识(增加异常处理,防止崩溃) + my_targets = list(world.list_targets(mine=True)) + if not my_targets: + return False + my_side_is_left = world.is_on_left(my_targets[0]) + midline_x = get_midline_x() + + # 我方在左侧:当前x < 中线,下一步x >= 中线 → 即将越界 + if my_side_is_left: + current_in_my_territory = current_pos[0] < midline_x + next_in_enemy_territory = next_pos[0] >= midline_x + # 我方在右侧:当前x > 中线,下一步x <= 中线 → 即将越界 + else: + current_in_my_territory = current_pos[0] > midline_x + next_in_enemy_territory = next_pos[0] <= midline_x + + return current_in_my_territory and next_in_enemy_territory + + +def has_enemy_opposite(current_pos, next_pos, opponents, offset=1): + """ + 【核心修改】仅检测中线对面「贴着中线」的位置是否有敌方玩家 + 贴着中线定义:敌方玩家的X坐标与中线X坐标的差值的绝对值 ≤ 1(即紧邻中线) + 同时保留Y轴小幅偏移(可调整offset),确保精准检测 + """ + # 前置校验:敌方玩家列表为空时,直接返回False + if not opponents: + return False + # 1. 获取中线X坐标,定义「贴着中线」的判定条件 + midline_x = get_midline_x() + STICK_TO_MIDLINE_THRESHOLD = 1 # 贴着中线:X坐标与中线差值≤1 + # 2. 即将越界位置的Y轴(判定基准) + target_y = next_pos[1] + + # 遍历敌方玩家(增加数据有效性判断) + for opponent in opponents: + # 校验敌方玩家数据完整性,防止KeyError + if not all(key in opponent for key in ["name", "posX", "posY"]): + continue + opp_pos = (opponent["posX"], opponent["posY"]) + + # 【关键判定1】敌方玩家必须「贴着中线」(X坐标紧邻中线) + if abs(opp_pos[0] - midline_x) > STICK_TO_MIDLINE_THRESHOLD: + continue # 非贴着中线的敌方,直接跳过 + # 【关键判定2】Y轴在允许偏移范围内(与即将越界位置的Y轴接近) + if abs(opp_pos[1] - target_y) > offset: + continue + # 【关键判定3】敌方玩家在敌方半场(与玩家即将进入的区域一致,中线两侧) + if is_in_my_territory(opp_pos): + continue + # 【关键判定4】敌方玩家与越界位置在中线两侧(严格对面) + opp_side_left = opp_pos[0] < midline_x + next_side_left = next_pos[0] < midline_x + if opp_side_left == next_side_left: + continue # 同一侧,非对面 + + # 满足所有条件(贴着中线+Y轴接近+敌方半场+中线两侧),判定为有风险 + print(f"[越界安全检测] 发现中线对面贴着中线的敌方玩家 {opponent['name']} 位置 {opp_pos},禁止越界") + return True + return False + + +# 寻找敌方营地内的安全躲避位置 +def find_safe_hide_position_in_enemy_territory(player_pos, opponents, enemy_flags): + """ + 在敌方营地内寻找安全躲避位置(远离所有敌方玩家,优先靠近目标旗帜) + :param player_pos: 玩家当前位置 + :param opponents: 敌方玩家列表 + :param enemy_flags: 敌方旗帜列表 + :return: 安全躲避位置 / None + """ + if not opponents: + return player_pos # 无敌方时,当前位置即为安全 + + # 定义安全距离:与所有敌方玩家的曼哈顿距离≥3格 + SAFE_DISTANCE = 3 + enemy_territory_positions = [] + opponent_positions = [(o["posX"], o["posY"]) for o in opponents if all(key in o for key in ["posX", "posY"])] + + # 遍历敌方营地内所有有效位置(非墙壁) + for x in range(world.width): + for y in range(world.height): + pos = (x, y) + # 仅保留敌方营地+非墙壁位置 + if is_in_enemy_territory(pos) and pos not in world.walls: + # 检查是否与所有敌方玩家保持安全距离 + is_safe = True + for opp_pos in opponent_positions: + if calculate_manhattan_distance(pos, opp_pos) < SAFE_DISTANCE: + is_safe = False + break + if is_safe: + enemy_territory_positions.append(pos) + + if not enemy_territory_positions: + print("[躲避逻辑] 敌方营地无安全位置,优先向中线撤退") + # 无安全位置时,返回靠近中线的位置 + midline_x = get_midline_x() + retreat_pos = min( + [(x, y) for x in range(world.width) for y in range(world.height) if + is_in_enemy_territory((x, y)) and (x, y) not in world.walls], + key=lambda p: abs(p[0] - midline_x) + ) + return retreat_pos + + # 优先选择靠近目标旗帜的安全位置(为后续拿旗做准备) + if enemy_flags: + target_flag_positions = [(f["posX"], f["posY"]) for f in enemy_flags] + # 按「距离最近目标旗帜」排序安全位置 + enemy_territory_positions.sort( + key=lambda p: min(calculate_manhattan_distance(p, flag_pos) for flag_pos in target_flag_positions) + ) + else: + # 无旗帜时,按「距离当前玩家最近」排序 + enemy_territory_positions.sort( + key=lambda p: calculate_manhattan_distance(p, player_pos) + ) + + # 返回最优安全躲避位置 + best_hide_pos = enemy_territory_positions[0] + print(f"[躲避逻辑] 找到最优安全躲避位置: {best_hide_pos}(远离敌方,靠近目标旗帜)") + return best_hide_pos + + +# 判断是否需要躲避敌方(进攻者在敌方营地时) +def need_hide_from_enemy(player_pos, opponents): + """ + 判断进攻者在敌方营地时是否需要躲避敌方 + :param player_pos: 玩家当前位置 + :param opponents: 敌方玩家列表 + :return: True=需要躲避(敌方过近), False=无需躲避(安全状态) + """ + if not opponents or not is_in_enemy_territory(player_pos): + return False # 无敌方 / 不在敌方营地,无需躲避 + + # 危险距离:与任意敌方玩家曼哈顿距离<3格,判定为需要躲避 + DANGER_DISTANCE = 3 + opponent_positions = [(o["posX"], o["posY"]) for o in opponents if all(key in o for key in ["posX", "posY"])] + for opp_pos in opponent_positions: + if calculate_manhattan_distance(player_pos, opp_pos) < DANGER_DISTANCE: + print(f"[躲避逻辑] 检测到敌方玩家过近(距离<{DANGER_DISTANCE}格),需要优先躲避") + return True + + return False + + +def calculate_safe_and_dangerous_positions(my_pos, opponents): + """ + 计算安全位置和危险位置 + 安全位置:己方player能在任意对方player之前提前一轮到达的位置 + 己方半场的位置都是安全位置 + 其余位置都是危险位置(包括周围一格范围) + """ + safe_positions = set() + base_dangerous_positions = set() + + print(f"正在计算从位置 {my_pos} 出发的安全/危险位置...") + + # 遍历地图上所有有效位置 + for x in range(world.width): + for y in range(world.height): + pos = (x, y) + + # 跳过墙壁 + if pos in world.walls: + continue + + # 己方半场的位置都是安全位置 + if is_in_my_territory(pos): + safe_positions.add(pos) + continue + + # 计算己方到达该位置的距离 + my_distance = calculate_manhattan_distance(my_pos, pos) + + # 检查是否能比所有对手提前一轮到达 + is_safe = True + for opponent in opponents: + opponent_pos = (opponent["posX"], opponent["posY"]) + opponent_distance = calculate_manhattan_distance(opponent_pos, pos) + + # 如果对手能同时或更快到达,则不安全 + if opponent_distance <= my_distance: + is_safe = False + break + + if is_safe: + safe_positions.add(pos) + else: + base_dangerous_positions.add(pos) + + # 将危险位置范围扩大一格 + expanded_dangerous_positions = set(base_dangerous_positions) + for dangerous_pos in base_dangerous_positions: + x, y = dangerous_pos + # 检查周围8个方向的位置 + for dx in [-1, 0, 1]: + for dy in [-1, 0, 1]: + if dx == 0 and dy == 0: + continue # 跳过自身位置 + + neighbor_pos = (x + dx, y + dy) + nx, ny = neighbor_pos + + # 检查边界和墙壁 + if (0 <= nx < world.width and 0 <= ny < world.height and + neighbor_pos not in world.walls and + not is_in_my_territory(neighbor_pos)): # 己方半场仍然安全 + expanded_dangerous_positions.add(neighbor_pos) + + # 从安全位置中移除被扩展为危险的位置(除了己方半场) + final_safe_positions = set() + for pos in safe_positions: + if is_in_my_territory(pos) or pos not in expanded_dangerous_positions: + final_safe_positions.add(pos) + + print( + f"计算完成:基础危险位置 {len(base_dangerous_positions)} 个,扩展后危险位置 {len(expanded_dangerous_positions)} 个") + print(f"最终安全位置 {len(final_safe_positions)} 个") + + return final_safe_positions, expanded_dangerous_positions + + +def find_closest_safe_position_to_target(my_pos, target_pos, safe_positions): + """ + 找到距离目标最近的安全位置 + 这个位置必须能通过安全路径到达 + """ + if not safe_positions: + return None + + print(f"寻找从 {my_pos} 到目标 {target_pos} 的最近安全位置...") + + # 如果目标本身就是安全位置,直接返回 + if target_pos in safe_positions: + print(f"目标位置 {target_pos} 本身就是安全位置") + return target_pos + + # 按距离目标的远近排序所有安全位置 + safe_positions_list = list(safe_positions) + safe_positions_list.sort(key=lambda pos: calculate_manhattan_distance(pos, target_pos)) + + # 找到第一个能通过安全路径到达的安全位置 + for safe_pos in safe_positions_list: + # 检查从当前位置到这个安全位置的路径是否只经过安全位置 + path = a_star(my_pos, safe_pos, []) + if path: + # 检查路径上的所有位置是否都是安全的 + path_is_safe = all(pos in safe_positions for pos in path) + if path_is_safe: + print(f"找到最近安全位置: {safe_pos}, 距离目标 {calculate_manhattan_distance(safe_pos, target_pos)}") + return safe_pos + + print("未找到可通过安全路径到达的安全位置") + return None + + +def assign_player_roles(my_players, enemy_flags, my_flags, full_defense_mode=False): + """ + 动态分配玩家角色(攻击者/防守者) + 至少保持1个防守者,其余为攻击者 + 当full_defense_mode=True时,所有玩家都转为防守者 + """ + global player_roles + + total_players = len(my_players) + if total_players == 0: + return + + # 完全防守模式:所有玩家都转为防守者 + if full_defense_mode: + for p in my_players: + player_roles[p["name"]] = "defender" + print("进入完全防守模式,所有玩家均转为防守者") + return + + # 计算需要的防守者数量 + min_defenders = 1 + enemy_threats = len([f for f in my_flags if f["canPickup"]]) + needed_defenders = max(min_defenders, min(enemy_threats, total_players // 2)) + + # 清理已不存在的玩家角色 + current_players = {p["name"] for p in my_players} + player_roles = {name: role for name, role in player_roles.items() + if name in current_players} + + # 为新玩家分配角色 + current_defenders = [name for name, role in player_roles.items() + if role == "defender"] + current_attackers = [name for name, role in player_roles.items() + if role == "attacker"] + + unassigned = [p["name"] for p in my_players if p["name"] not in player_roles] + + # 如果防守者不足,从攻击者中转换 + while len(current_defenders) < needed_defenders and current_attackers: + # 选择最适合防守的攻击者(距离我方目标最近的) + my_targets = list(world.list_targets(mine=True)) + if my_targets: + best_candidate = min(current_attackers, + key=lambda name: min(calculate_manhattan_distance( + (next(p for p in my_players if p["name"] == name)["posX"], + next(p for p in my_players if p["name"] == name)["posY"]), target) + for target in my_targets)) + player_roles[best_candidate] = "defender" + current_defenders.append(best_candidate) + current_attackers.remove(best_candidate) + + # 分配未分配的玩家 + for name in unassigned: + if len(current_defenders) < needed_defenders: + player_roles[name] = "defender" + current_defenders.append(name) + else: + player_roles[name] = "attacker" + current_attackers.append(name) + + +def assign_defender_positions(defenders, my_players, my_flags, opponents, full_defense_mode=False): + """ + 为防守者分配防守位置 - 重构版本 + 每轮前往距离任意opponent最近的flag,绝对不能到对方半场 + 当full_defense_mode=True且防守者数量≥剩余旗帜数时,所有人员分开站在旗帜上不动 + """ + global defender_positions + + if not defenders: + return + + my_targets = list(world.list_targets(mine=True)) + if not my_targets: + return + + # 清空之前的分配 + defender_positions = {} + + # 只考虑我方半场的flags(绝对不能到对方半场) + my_territory_flags = [] + for flag in my_flags: + flag_pos = (flag["posX"], flag["posY"]) + if is_in_my_territory(flag_pos) and flag["canPickup"]: + my_territory_flags.append(flag) + + print(f"我方半场可防守的flags: {[(f['posX'], f['posY']) for f in my_territory_flags]}") + + # 完全防守模式特殊处理 + if full_defense_mode and len(defenders) >= len(my_territory_flags) and my_territory_flags: + print("完全防守模式:所有防守者分开站在旗帜上不动防守") + # 为每个旗帜分配一个防守者 + assigned_flags = set() + for i, defender_name in enumerate(defenders): + # 循环分配旗帜 + flag_index = i % len(my_territory_flags) + flag = my_territory_flags[flag_index] + flag_pos = (flag["posX"], flag["posY"]) + + # 确保每个旗帜只分配一个防守者 + if flag_pos not in assigned_flags: + defender_positions[defender_name] = flag_pos + assigned_flags.add(flag_pos) + print(f"defender {defender_name} 被分配驻守flag {flag_pos}") + else: + # 如果旗帜已被分配,分配到目标区域巡逻 + target = my_targets[i % len(my_targets)] + defender_positions[defender_name] = target + print(f"defender {defender_name} 被分配到巡逻点 {target} (旗帜已被驻守)") + return + + if not my_territory_flags: + # 如果没有需要防守的flags,defenders在我方target区域巡逻 + print("没有需要防守的flags,defenders在我方领域巡逻") + for i, defender_name in enumerate(defenders): + target = my_targets[i % len(my_targets)] + defender_positions[defender_name] = target + print(f"defender {defender_name} 被分配到巡逻点 {target}") + return + + # 为每个opponent找到距离它最近的我方flag(只检测不持有己方flag的opponent) + threatened_flags = [] + for opponent in opponents: + opponent_pos = (opponent["posX"], opponent["posY"]) + + # 检查opponent是否持有我方flag + if opponent.get("hasFlag", False): + print(f"opponent {opponent['name']} 持有我方flag,跳过威胁检测") + continue + + if my_territory_flags: + # 找到距离这个opponent最近的我方flag + closest_flag = min(my_territory_flags, + key=lambda f: calculate_manhattan_distance( + (f["posX"], f["posY"]), opponent_pos)) + closest_flag_pos = (closest_flag["posX"], closest_flag["posY"]) + distance = calculate_manhattan_distance(closest_flag_pos, opponent_pos) + + # 检查opponent是否在我方半场 + is_opponent_in_my_territory = is_in_my_territory(opponent_pos) + + # 在己方半场的opponent优先级为0(最高优先级),否则按距离排序 + priority = 0 if is_opponent_in_my_territory else distance + + threatened_flags.append({ + 'flag_pos': closest_flag_pos, + 'opponent': opponent, + 'distance': distance, + 'priority': priority, + 'in_my_territory': is_opponent_in_my_territory + }) + + territory_status = "己方半场" if is_opponent_in_my_territory else "敌方半场" + print( + f"opponent {opponent['name']} 在 {opponent_pos} ({territory_status}) 威胁flag {closest_flag_pos}, 距离: {distance}, 优先级: {priority}") + + # 按威胁优先级排序(距离最近的威胁优先处理) + threatened_flags.sort(key=lambda x: x['priority']) + + # 为威胁最大的flags分配defenders,考虑时间优势 + assigned_flags = set() + for i, defender_name in enumerate(defenders): + if i < len(threatened_flags): + threat = threatened_flags[i] + new_flag_pos = threat['flag_pos'] + + # 获取defender当前位置 + defender = next((p for p in my_players if p["name"] == defender_name), None) + if not defender: + continue + + defender_pos = (defender["posX"], defender["posY"]) + + # 检查是否应该切换到新的flag + should_switch = True + + # 如果defender之前已有目标,检查是否值得切换 + if defender_name in defender_positions: + current_target = defender_positions[defender_name] + + # 计算defender到当前目标和新目标的距离 + distance_to_current = calculate_manhattan_distance(defender_pos, current_target) + distance_to_new = calculate_manhattan_distance(defender_pos, new_flag_pos) + + # 计算opponent到新flag的距离 + opponent_to_new = threat['distance'] + + # 只有当defender能在opponent前到达新flag,且新威胁更紧急时才切换 + if distance_to_new >= opponent_to_new: + should_switch = False + print( + f"defender {defender_name} 保持当前目标 {current_target},因为无法及时赶到新威胁 {new_flag_pos}") + print( + f" 当前距离: {distance_to_current}, 新目标距离: {distance_to_new}, opponent距离: {opponent_to_new}") + else: + print(f"defender {defender_name} 切换到新威胁 {new_flag_pos},能够及时拦截") + print(f" defender距离: {distance_to_new}, opponent距离: {opponent_to_new}") + + if should_switch: + # 避免多个defender保护同一个flag(除非defender太多) + if new_flag_pos not in assigned_flags or len(assigned_flags) >= len(my_territory_flags): + defender_positions[defender_name] = new_flag_pos + assigned_flags.add(new_flag_pos) + print( + f"defender {defender_name} 被分配保护flag {new_flag_pos}, 对抗opponent {threat['opponent']['name']} (距离: {threat['distance']})") + else: + # 如果新flag已有人守护,保持当前目标或选择其他flag + if defender_name in defender_positions: + print( + f"defender {defender_name} 保持当前目标 {defender_positions[defender_name]},新flag已有其他defender") + else: + remaining_flags = [f for f in my_territory_flags + if (f["posX"], f["posY"]) not in assigned_flags] + if remaining_flags: + flag = remaining_flags[0] + flag_pos = (flag["posX"], flag["posY"]) + defender_positions[defender_name] = flag_pos + assigned_flags.add(flag_pos) + print(f"defender {defender_name} 被分配到备选flag {flag_pos}") + else: + target = my_targets[i % len(my_targets)] + defender_positions[defender_name] = target + print(f"defender {defender_name} 被分配到巡逻点 {target}") + else: + defender_positions[defender_name] = current_target + else: + # 多余的defenders在target区域巡逻 + target = my_targets[i % len(my_targets)] + defender_positions[defender_name] = target + print(f"额外defender {defender_name} 被分配到巡逻点 {target}") + + print(f"最终defender分配: {defender_positions}") + + +def assign_attacker_targets(attackers, my_players, enemy_flags, opponents): + """为攻击者分配攻击目标""" + global flag_assignments, attack_tasks + + if not attackers or not enemy_flags: + return + + my_targets = list(world.list_targets(mine=True)) + if not my_targets: + return + + # 获取opponent位置集合 + opponent_positions_set = {(o["posX"], o["posY"]) for o in opponents} + + # 清理无效的分配 + active_attackers = set(attackers) + flag_assignments = {name: pos for name, pos in flag_assignments.items() + if name in active_attackers} + attack_tasks = {name: status for name, status in attack_tasks.items() + if name in active_attackers} + + # 检查并更新现有的flag分配 + attackers_need_reassign = [] + for attacker_name in attackers: + player = next((p for p in my_players if p["name"] == attacker_name), None) + if not player: + continue + + # 如果玩家带flag,更新为返回状态 + if player["hasFlag"]: + attack_tasks[attacker_name] = "returning" + if attacker_name in flag_assignments: + del flag_assignments[attacker_name] + continue + + # 检查当前分配的flag上是否站着opponent + if attacker_name in flag_assignments: + assigned_flag = flag_assignments[attacker_name] + if assigned_flag in opponent_positions_set: + print(f"攻击者 {attacker_name} 的目标flag {assigned_flag} 上站着opponent,需要重新分配") + del flag_assignments[attacker_name] + if attacker_name in attack_tasks: + del attack_tasks[attacker_name] + attackers_need_reassign.append(attacker_name) + else: + print(f"攻击者 {attacker_name} 的目标flag {assigned_flag} 安全") + else: + # 没有分配目标的攻击者 + attackers_need_reassign.append(attacker_name) + + # 为需要重新分配的攻击者选择new flag + assigned_flags = set(flag_assignments.values()) + available_flags = [(f["posX"], f["posY"]) for f in enemy_flags + if (f["posX"], f["posY"]) not in assigned_flags] + + # 筛选出没有opponent站着的flag + safe_flags = [flag_pos for flag_pos in available_flags + if flag_pos not in opponent_positions_set] + + print(f"可用flags: {available_flags}") + print(f"安全flags(无opponent): {safe_flags}") + + for attacker_name in attackers_need_reassign: + player = next((p for p in my_players if p["name"] == attacker_name), None) + if not player: + continue + + player_pos = (player["posX"], player["posY"]) + + if safe_flags: + # 选择寻路分数最低的安全flag + print(f"为攻击者 {attacker_name} 在 {player_pos} 从安全flags中选择目标...") + + # 计算当前玩家的危险位置 + safe_positions, dangerous_positions = calculate_safe_and_dangerous_positions(player_pos, opponents) + + best_flag = None + best_score = float('inf') + + for flag_pos in safe_flags: + # 使用权重寻路计算到达该flag的实际代价 + opponent_positions_list = [(o["posX"], o["posY"]) for o in opponents] + path = a_star(player_pos, flag_pos, dangerous_positions, opponent_positions_list) + + if path: + # 计算路径的总权重代价 + total_cost = 0 + dangerous_set = set(dangerous_positions) + + for pos in path: + if pos in dangerous_set: + total_cost += 10000 # 危险位置代价 + else: + total_cost += 1 # 正常位置代价 + + print(f" 安全Flag {flag_pos}: 路径长度={len(path)}, 总代价={total_cost}") + + if total_cost < best_score: + best_score = total_cost + best_flag = flag_pos + else: + print(f" 安全Flag {flag_pos}: 无法到达") + + if best_flag: + flag_assignments[attacker_name] = best_flag + attack_tasks[attacker_name] = "attacking" + safe_flags.remove(best_flag) + print(f" 选择最优安全目标: {best_flag}, 代价: {best_score}") + else: + print(f" WARNING: 攻击者 {attacker_name} 无法到达任何安全flag") + else: + # 没有安全flag,尝试选择有opponent占据的flag + print(f"攻击者 {attacker_name} 没有安全flag可选,尝试选择有opponent占据的flag") + + occupied_flags = [flag_pos for flag_pos in available_flags + if flag_pos in opponent_positions_set] + + if occupied_flags: + # 检查当前是否已经在攻击某个被占据的flag + current_assignment = flag_assignments.get(attacker_name) + if current_assignment in occupied_flags: + # 检查距离,如果靠近3格opponent还没让开就换目标 + distance_to_flag = calculate_manhattan_distance(player_pos, current_assignment) + if distance_to_flag <= 3: + print( + f" 攻击者 {attacker_name} 距离被占据flag {current_assignment} 仅{distance_to_flag}格,opponent未让开,寻找替代目标") + + # 寻找其他可选的flag + other_flags = [f for f in occupied_flags if f != current_assignment] + other_safe_flags = [f for f in available_flags if + f not in opponent_positions_set and f != current_assignment] + + if other_safe_flags: + # 优先选择安全flag + best_alternative = min(other_safe_flags, + key=lambda f: calculate_manhattan_distance(player_pos, f)) + flag_assignments[attacker_name] = best_alternative + attack_tasks[attacker_name] = "attacking" + print(f" 切换到安全备选flag: {best_alternative}") + elif other_flags: + # 选择其他被占据的flag + best_alternative = min(other_flags, + key=lambda f: calculate_manhattan_distance(player_pos, f)) + flag_assignments[attacker_name] = best_alternative + attack_tasks[attacker_name] = "attacking" + print(f" 切换到其他被占据flag: {best_alternative}") + else: + # 没有其他选择,返回己方半场 + print(f" 无其他flag可选,攻击者 {attacker_name} 返回己方半场") + _assign_retreat_position(attacker_name, player_pos, opponent_positions_set) + else: + print( + f" 攻击者 {attacker_name} 继续攻击被占据flag {current_assignment}, 距离: {distance_to_flag}") + else: + # 为新攻击者分配被占据的flag + best_occupied_flag = min(occupied_flags, + key=lambda f: calculate_manhattan_distance(player_pos, f)) + flag_assignments[attacker_name] = best_occupied_flag + attack_tasks[attacker_name] = "attacking" + print(f" 分配被占据flag: {best_occupied_flag}") + else: + # 没有任何可用flag,向己方半场撤退 + print(f"攻击者 {attacker_name} 没有任何flag可选,向己方半场撤退") + _assign_retreat_position(attacker_name, player_pos, opponent_positions_set) + + +def _assign_retreat_position(attacker_name, player_pos, opponent_positions_set): + """为攻击者分配撤退位置""" + # 在己方半场选择一个随机安全位置 + my_territory_positions = [] + for x in range(world.width): + for y in range(world.height): + pos = (x, y) + if (is_in_my_territory(pos) and pos not in world.walls and + pos not in opponent_positions_set): + my_territory_positions.append(pos) + + if my_territory_positions: + random_target = random.choice(my_territory_positions) + flag_assignments[attacker_name] = random_target + attack_tasks[attacker_name] = "retreating" + print(f" 攻击者 {attacker_name} 选择随机撤退位置: {random_target}") + else: + print(f" WARNING: 攻击者 {attacker_name} 无法找到安全撤退位置") + + +def plan_next_actions(req): + """主要的游戏策略规划函数(优化:进攻者在敌方营地优先躲敌,再拿旗)""" + if not world.update(req): + return {} + + print(f"\n=== 游戏回合 {req.get('time', 'unknown')} ===") + + global player_roles, flag_assignments, attack_tasks, defender_positions + + # 获取游戏状态 + my_players = world.list_players(mine=True, inPrison=False, hasFlag=None) + opponents = world.list_players(mine=False, inPrison=False, hasFlag=None) + enemy_flags = world.list_flags(mine=False, canPickup=True) + my_flags = world.list_flags(mine=True, canPickup=True) + my_targets = list(world.list_targets(mine=True)) + + # 计算双方剩余旗帜数量 + my_remaining_flags = len(my_flags) + enemy_remaining_flags = len(enemy_flags) + print(f"我方剩余旗帜: {my_remaining_flags}, 敌方剩余旗帜: {enemy_remaining_flags}") + + # 检查是否需要进入完全防守模式 + full_defense_mode = False + if my_remaining_flags <= 3 and enemy_remaining_flags <= my_remaining_flags: + full_defense_mode = True + print( + f"触发完全防守模式: 我方剩余旗帜={my_remaining_flags} ≤3 且 敌方剩余旗帜={enemy_remaining_flags} ≤ 我方旗帜数") + + print(f"我方玩家: {[(p['name'], p['posX'], p['posY'], p['hasFlag']) for p in my_players]}") + print(f"敌方玩家: {[(p['name'], p['posX'], p['posY']) for p in opponents]}") + print(f"敌方flags: {[(f['posX'], f['posY']) for f in enemy_flags]}") + print(f"我方targets: {my_targets}") + + if not my_players or not my_targets: + print("没有我方玩家或目标,返回空移动") + return {} + + # 1. 动态角色分配(支持完全防守模式) + assign_player_roles(my_players, enemy_flags, my_flags, full_defense_mode) + print(f"角色分配: {player_roles}") + + # 2. 获取当前角色分配 + defenders = [name for name, role in player_roles.items() if role == "defender"] + attackers = [name for name, role in player_roles.items() if role == "attacker"] + print(f"防守者: {defenders}, 攻击者: {attackers}") + + # 3. 分配防守和攻击任务(防守任务支持完全防守模式) + assign_defender_positions(defenders, my_players, my_flags, opponents, full_defense_mode) + # 如果不是完全防守模式才分配攻击任务 + if not full_defense_mode: + assign_attacker_targets(attackers, my_players, enemy_flags, opponents) + else: + # 完全防守模式下清空攻击任务 + flag_assignments = {} + attack_tasks = {} + print("完全防守模式:清空所有攻击任务") + + print(f"防守位置: {defender_positions}") + print(f"攻击目标: {flag_assignments}") + print(f"攻击任务状态: {attack_tasks}") + + # 4. 为每个玩家计算移动 + player_moves = {} + + for player in my_players: + player_name = player["name"] + current_pos = (player["posX"], player["posY"]) + is_attacker = player_name in attackers + is_in_enemy = is_in_enemy_territory(current_pos) + has_flag = player["hasFlag"] + + print(f"\n--- 处理玩家 {player_name} 位置 {current_pos} ---") + print( + f"[{player_name}] 身份: {'攻击者' if is_attacker else '防守者'}, 所在区域: {'敌方营地' if is_in_enemy else '己方营地'}, 持有旗帜: {has_flag}") + + # 【最高优先级】强制攻击逻辑:如果距离opponent仅1格且opponent在我方半场,无论什么状态都必须向opponent移动 + adjacent_opponent = None + closest_distance = float('inf') + + for opponent in opponents: + opponent_pos = (opponent["posX"], opponent["posY"]) + distance = calculate_manhattan_distance(current_pos, opponent_pos) + + # 检查是否相邻(距离为1)且opponent在我方半场 + if distance == 1 and is_in_my_territory(opponent_pos): + # 如果有多个相邻入侵者,选择最近的一个 + if distance < closest_distance: + closest_distance = distance + adjacent_opponent = opponent + print(f"{player_name} 发现距离{distance}格的入侵者 {opponent['name']} 在我方半场 {opponent_pos}") + + # 强制攻击入侵者 - 无论玩家当前状态是什么(攻击者、防守者、带flag等) + if adjacent_opponent: + opponent_pos = (adjacent_opponent["posX"], adjacent_opponent["posY"]) + move = world.get_direction(current_pos, opponent_pos) + if move: + player_moves[player_name] = move + print( + f"【强制攻击】{player_name} 无论当前状态如何,必须攻击入侵者 {adjacent_opponent['name']}: {current_pos} -> {opponent_pos}, 方向: {move}") + continue + else: + print(f"ERROR: {player_name} 无法向入侵者移动 - 这不应该发生!") + + # 【优化核心逻辑】进攻者在敌方营地:优先判断是否需要躲避敌方 + if is_attacker and is_in_enemy and not has_flag: + if need_hide_from_enemy(current_pos, opponents): + # 1. 需要躲避:更新任务状态为躲避 + attack_tasks[player_name] = "hiding" + # 2. 寻找敌方营地内的安全躲避位置 + hide_pos = find_safe_hide_position_in_enemy_territory(current_pos, opponents, enemy_flags) + if hide_pos: + # 3. 规划前往躲避位置的路径 + opponent_positions = [(o["posX"], o["posY"]) for o in opponents] + path = a_star(current_pos, hide_pos, [], opponent_positions) + if path and len(path) > 1: + next_pos = path[1] + move = world.get_direction(current_pos, next_pos) + if move: + player_moves[player_name] = move + print( + f"【优先躲避】{player_name} 前往安全躲避位置: {current_pos} -> {next_pos} (最终目标: {hide_pos})") + continue + print(f"【躲避失败】{player_name} 无法找到躲避路径,执行原拿旗逻辑") + + # 确定目标位置(原有逻辑,躲避优先级高于此) + target_pos = None + task_type = "unknown" + + if player["hasFlag"]: + # 带flag回家 - 选择加权分值最低的target + print(f"{player_name} 正在选择最优回程目标(基于加权分值)...") + + # 计算当前位置的危险区域(用于加权计算) + safe_positions, dangerous_positions = calculate_safe_and_dangerous_positions(current_pos, opponents) + dangerous_set = set(dangerous_positions) + + best_target = None + lowest_weighted_score = float('inf') + + for target in my_targets: + # 计算到每个target的加权路径 + opponent_positions = [(o["posX"], o["posY"]) for o in opponents] + path = a_star(current_pos, target, dangerous_positions, opponent_positions) + + if path: + # 计算路径的总加权代价 + total_weighted_cost = 0 + for pos in path: + if pos in dangerous_set: + total_weighted_cost += 10000 # 危险位置代价 + else: + total_weighted_cost += 1 # 正常位置代价 + + print(f" Target {target}: 路径长度={len(path)}, 加权代价={total_weighted_cost}") + + if total_weighted_cost < lowest_weighted_score: + lowest_weighted_score = total_weighted_cost + best_target = target + else: + print(f" Target {target}: 无法到达") + + if best_target: + target_pos = best_target + print(f"{player_name} 选择最优回程目标: {target_pos}, 加权代价: {lowest_weighted_score}") + else: + # 如果所有target都无法到达,选择第一个作为备选 + target_pos = my_targets[0] + print(f"WARNING: {player_name} 无法到达任何target,使用默认目标: {target_pos}") + + task_type = "returning_flag" + elif player_name in defender_positions: + # 防守者前往防守位置 + target_pos = defender_positions[player_name] + task_type = "defending" + + # 完全防守模式且已到达旗帜位置则不移动 + if (full_defense_mode and + len(defenders) >= len(my_flags) and + current_pos == target_pos): + print(f"{player_name} 已在防守位置 {target_pos},完全防守模式下不移动") + continue + elif player_name in flag_assignments: + # 攻击者前往目标flag(仅当无需躲避时执行) + target_pos = flag_assignments[player_name] + task_type = "attacking" + + print(f"{player_name} 任务类型: {task_type}, 目标位置: {target_pos}") + + if not target_pos or current_pos == target_pos: + print(f"{player_name} 没有目标或已到达目标,跳过") + continue + + # 检查是否在我方领域 + in_my_territory = is_in_my_territory(current_pos) + print(f"{player_name} 是否在我方领域: {in_my_territory}") + + # 对于attacker,使用权重寻路策略(软性约束危险位置,硬性约束对手位置) + if task_type == "attacking": + print(f"攻击者 {player_name} 使用权重寻路策略") + + # 计算当前的危险位置 + safe_positions, dangerous_positions = calculate_safe_and_dangerous_positions(current_pos, opponents) + print(f"{player_name} 检测到 {len(dangerous_positions)} 个危险位置") + + # 对手位置作为硬限制(完全不能通过) + opponent_positions = [(o["posX"], o["posY"]) for o in opponents] + print(f"{player_name} 硬限制对手位置: {opponent_positions}") + + # 直接向目标flag移动:危险位置软限制,对手位置硬限制 + path = a_star(current_pos, target_pos, dangerous_positions, opponent_positions, opponents) + print(f"{player_name} 权重寻路路径: {path}") + + if not path: + # 如果权重寻路失败,使用传统寻路作为备选 + print(f"{player_name} 权重寻路失败,尝试传统寻路") + path = world.route_to(current_pos, target_pos, opponent_positions) + else: + # 对于defender或带flag的玩家,使用传统策略 + print(f"{player_name} 使用传统寻路策略") + + extra_obstacles = [] + if not in_my_territory: + # 在敌方领域时避免与对手直接冲突 + extra_obstacles = [(o["posX"], o["posY"]) for o in opponents] + print(f"{player_name} 在敌方领域,避开 {len(extra_obstacles)} 个对手位置") + + # 使用A*算法寻路 + path = a_star(current_pos, target_pos, extra_obstacles) + + # 如果A*失败,使用原始寻路算法作为备选 + if not path: + print(f"{player_name} A*寻路失败,尝试原始BFS算法") + path = world.route_to(current_pos, target_pos, extra_obstacles) + + print(f"{player_name} 最终路径: {path}") + + if len(path) > 1: + next_pos = path[1] + current_pos = (player["posX"], player["posY"]) + player_name = player["name"] + + # ===================== 嵌入:越界安全检测(无bug) ===================== + try: + # 1. 检测是否即将越界 + about_to_cross = is_about_to_cross_midline(current_pos, next_pos) + if about_to_cross: + # 2. 检测中线对面贴着中线的敌方 + enemy_opposite = has_enemy_opposite(current_pos, next_pos, opponents) + if enemy_opposite: + # 3. 有敌方则禁止移动,原地待命 + print(f"[{player_name}] 即将越界遇中线对面贴着中线的敌方,停止移动(安全防护)") + continue + except Exception as e: + # 捕获异常不影响程序运行 + print(f"[{player_name}] 越界检测临时异常:{e},继续正常移动") + # ===================================================================== + + # 原有移动逻辑 + move = world.get_direction(current_pos, next_pos) + print(f"{player_name} 下一步: {current_pos} -> {next_pos}, 方向: {move}") + if move: + player_moves[player_name] = move + else: + print(f"WARNING: {player_name} 无法确定移动方向!") + else: + print(f"WARNING: {player_name} 没有找到有效路径!") + + print(f"最终移动指令: {player_moves}") + print("=" * 50) + return player_moves + + +def game_over(req): + print("Game Over!") + world.show(force=True) + + +async def main(): + import sys + if len(sys.argv) != 2: + print(f"Usage: python3 {sys.argv[0]} ") + print(f"Example: python3 {sys.argv[0]} 8080") + sys.exit(1) + + port = int(sys.argv[1]) + print(f"AI backend running on port {port} ...") + + try: + await run_game_server(port, start_game, plan_next_actions, game_over) + except Exception as e: + print(f"Server Stopped: {e}") + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/CTF/backend/server1.py b/CTF/backend/server1.py new file mode 100644 index 0000000..77abd56 --- /dev/null +++ b/CTF/backend/server1.py @@ -0,0 +1,293 @@ +import asyncio +from lib.game_engine import GameMap, run_game_server +import threading +import collections + +world = GameMap(show_gap_in_msec=1.0) +lock = threading.Lock() +last_updated_time = -1 +update_threshold = 1 +player_to_flag_assign = {} +my_side_is_left = None + +class Map: + def __init__(self): + self.width = world.width + self.height = world.height + self.grid = [0] * (self.width * self.height) + self.edge = [[] for _ in range(self.width * self.height)] + self.in_safe_zone = None + + def convert_pos_to_index(self, x, y): + return y * self.width + x + + def update(self,posx,posy): + global my_side_is_left + self.width = world.width + self.height = world.height + self.edge = [[] for _ in range(self.width * self.height)] + self.grid = [0] * (self.width * self.height) + walls = world.walls + for wall in walls: + x, y = wall + idx = self.convert_pos_to_index(x, y) + self.grid[idx] = 1 + if my_side_is_left: + self.in_safe_zone = world.is_on_left((min(posx+2,self.width),posy)) == my_side_is_left + else: + self.in_safe_zone = world.is_on_left((max(posx-2,0),posy)) == my_side_is_left + enemy_players = world.list_players(mine=False, inPrison=False, hasFlag=None) + ally_players = world.list_players(mine=True, inPrison=False, hasFlag=None) + my_pos = self.convert_pos_to_index(posx, posy) + for ally in ally_players: + x, y = ally["posX"], ally["posY"] + idx = self.convert_pos_to_index(x, y) + self.grid[idx] = 3 + for enemy in enemy_players: + x, y = enemy["posX"], enemy["posY"] + idx = self.convert_pos_to_index(x, y) + if my_side_is_left: + if not world.is_on_left((x+1,y)): + self.grid[idx] = 2 + for dx, dy in [(0, 1), (0, -1), (1, 0), (-1, 0)]: + nx, ny = x + dx, y + dy + self.grid[self.convert_pos_to_index(nx,ny)] = 2 + else: + if world.is_on_left((x-1,y)): + self.grid[idx] = 2 + for dx, dy in [(0, 1), (0, -1), (1, 0), (-1, 0)]: + nx, ny = x + dx, y + dy + self.grid[self.convert_pos_to_index(nx,ny)] = 2 + for y in range(self.height): + for x in range(self.width): + idx = self.convert_pos_to_index(x, y) + if self.in_safe_zone: + if (self.grid[idx] in (1,)) and idx != my_pos: + continue + else: + if (self.grid[idx] in (1, 2)) and idx != my_pos: + continue + for dx, dy in [(0, 1), (0, -1), (1, 0), (-1, 0)]: + nx, ny = x + dx, y + dy + if 0 <= nx < self.width and 0 <= ny < self.height: + n_idx = self.convert_pos_to_index(nx, ny) + if self.in_safe_zone: + if self.grid[n_idx] not in (1, ): + self.edge[idx].append(n_idx) + else: + if self.grid[n_idx] not in (1, 2): + self.edge[idx].append(n_idx) + def guideance(self, posx_start, posy_start, posx_end, posy_end): + self.update(posx_start,posy_start) + src_idx = self.convert_pos_to_index(posx_start, posy_start) + dst_idx = self.convert_pos_to_index(posx_end, posy_end) + n = self.width * self.height + dist = [float('inf')] * n + prev = [None] * n + dist[src_idx] = 0 + queue = collections.deque([src_idx]) + while queue: + u = queue.popleft() + if u == dst_idx: + break + for v in self.edge[u]: + if dist[v] == float('inf'): + dist[v] = dist[u] + 1 + prev[v] = u + queue.append(v) + if dist[dst_idx] == float('inf'): + return "" + path = [] + cur = dst_idx + while cur is not None: + path.append(cur) + cur = prev[cur] + path.reverse() # 现在是 [src, ..., dst] + if len(path) < 2: + return "" + # ---- 计算第一步坐标并返回方向 ---- + next_idx = path[1] + next_x = next_idx % (self.width) + next_y = next_idx // (self.height) + return world.get_direction((posx_start, posy_start), (next_x, next_y)) + def length(self, posx_start, posy_start, posx_end, posy_end): + self.update(posx_start,posy_start) + src_idx = self.convert_pos_to_index(posx_start, posy_start) + dst_idx = self.convert_pos_to_index(posx_end, posy_end) + n = self.width * self.height + dist = [float('inf')] * n # 最短距离,初始为无穷大 + prev = [None] * n # 前驱节点,用于路径回溯 + dist[src_idx] = 0 + queue = collections.deque([src_idx]) + while queue: + u = queue.popleft() + if u == dst_idx: + break + for v in self.edge[u]: + if dist[v] == float('inf'): + dist[v] = dist[u] + 1 + prev[v] = u + queue.append(v) + + # ---- 若不可达,返回空字符串 ---- + if dist[dst_idx] == float('inf'): + return -1 + return dist[dst_idx] + def closest(self, posx_start, posy_start, positions): + closest_pos = None + min_length = float('inf') + for pos in positions: + temp = self.length(posx_start, posy_start, pos[0], pos[1]) + if temp != -1 and temp < min_length: + min_length = temp + closest_pos = pos + return closest_pos + def closest_in_range(self, posx_start, posy_start, positions): + closest_pos = None + min_length = float('inf') + for pos in positions: + if my_side_is_left: + if pos[0] >= self.width // 2: + continue + else: + if pos[0] < self.width // 2: + continue + temp = self.length(posx_start, posy_start, pos[0], pos[1]) + if temp != -1 and temp < min_length: + min_length = temp + closest_pos = pos + return closest_pos,min_length + def enemy_to_walls(self, enemy_posX,enemy_posY): + idx = self.convert_pos_to_index(enemy_posX, enemy_posY) + lengths = abs(enemy_posX-self.width//2) + return lengths + def choose_enemy(self, my_posX, my_posY,enemy_positions): + chosen_enemy = None + min_length = -1 + for pos in enemy_positions: + if my_side_is_left: + if pos[0] >= self.width // 2: + continue + else: + if pos[0] < self.width // 2: + continue + temp = self.enemy_to_walls(pos[0], pos[1]) + length = self.length(my_posX, my_posY, pos[0], pos[1]) + if length > temp*2+4 : continue + if temp < length: + length = temp + chosen_enemy = pos + return chosen_enemy,min_length +myMap = Map() +def start_game(req): + global player_to_flag_assign,my_side_is_left + world.init(req) + print("Start Game!!") + player_to_flag_assign = {} + print(f"Game Started! Side: {'Left' if world.is_on_left(list(world.my_team_target)[0]) else 'Right'}") + my_side_is_left = world.is_on_left(list(world.my_team_target)[0]) + +def plan_next_actions(req): + if not world.update(req): + return + global player_to_flag_assign,myMap,my_side_is_left + + my_players = world.list_players(mine=True, inPrison=False, hasFlag=None) + enemy_players_flags = world.list_players(mine=False, inPrison=False, hasFlag=True) + enemy_players = world.list_players(mine=False, inPrison=False, hasFlag=False) + enemy_flags = world.list_flags(mine=False, canPickup=True) + my_flags = world.list_flags(mine=True, canPickup=True) + my_targets = list(world.list_targets(mine=True)) + active_player_names = {p["name"] for p in my_players if not p["hasFlag"]} + flags_list = [] + regard_list = [] + pretend_list = [] + protect_list = [] + targets_list = [] + for flags in enemy_flags: + flags_list.append((flags["posX"],flags["posY"])) + for p in enemy_players_flags: + regard_list.append((p["posX"],p["posY"])) + for p in enemy_players: + pretend_list.append((p["posX"],p["posY"])) + for flags in my_flags: + protect_list.append((flags["posX"],flags["posY"])) + for t in my_targets: + targets_list.append((t[0],t[1])) + player_to_flag_assign = { + name: pos for name, pos in player_to_flag_assign.items() + if name in active_player_names + } + if enemy_flags: + for p in my_players: + if p["name"] not in active_player_names: + continue + if p["name"] in player_to_flag_assign and player_to_flag_assign[p["name"]] in flags_list: + flags_list.remove(player_to_flag_assign[p["name"]]) + continue + f = myMap.closest(p["posX"],p["posY"],flags_list) + if f is not None: + player_to_flag_assign[p["name"]] = (f[0],f[1]) + flags_list.remove((f[0],f[1])) + player_moves = {} + + for p in my_players: + if p["hasFlag"]: + dest = myMap.closest(p["posX"],p["posY"],my_targets) + f,lentime = myMap.closest_in_range(p["posX"],p["posY"],regard_list) + if f is not None and lentime<4: + dest = (f[0],f[1]) + elif p["name"] in player_to_flag_assign: + dest = player_to_flag_assign[p["name"]] + f,lentime = myMap.closest_in_range(p["posX"],p["posY"],regard_list) + if f is not None and lentime<14: + dest = (f[0],f[1]) + regard_list.remove((f[0],f[1])) + else : + f,lentime = myMap.closest_in_range(p["posX"],p["posY"],pretend_list) + if f is not None and lentime<8: + dest = (f[0],f[1]) + pretend_list.remove((f[0],f[1])) + else: + f,temp = myMap.closest_in_range(p["posX"],p["posY"],regard_list) + if f is not None: + dest = (f[0],f[1]) + regard_list.remove((f[0],f[1])) + else: + f = myMap.closest(p["posX"],p["posY"],pretend_list) + if f is not None: + dest = (f[0],f[1]) + pretend_list.remove((f[0],f[1])) + else: + f= myMap.closest(p["posX"],p["posY"],protect_list) + if f is not None: + dest = (f[0],f[1]) + protect_list.remove((f[0],f[1])) + else: + continue + if dest is not None: + player_moves[p["name"]] = myMap.guideance(p["posX"],p["posY"],dest[0],dest[1]) + else: + player_moves[p["name"]] = myMap.guideance(p["posX"],p["posY"],p["posX"],p["posY"]) + return player_moves + +def game_over(req): + print("Game Over!") + world.show(force=True) + +async def main(): + import sys + if len(sys.argv) != 2: + print(f"Usage: python3 {sys.argv[0]} ") + print(f"Example: python3 {sys.argv[0]} 8080") + sys.exit(1) + port = int(sys.argv[1]) + print(f"AI backend running on port {port} ...") + try: + await run_game_server(port, start_game, plan_next_actions, game_over) + except Exception as e: + print(f"Server Stopped: {e}") + sys.exit(1) + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file