Files
zhichun-project/CTF/backend/server0.py
ydy0615 fc9f98aa3d feat(backend): integrate dual AI strategies with conditional switching and A* pathfinding
- Add imports for server0 and server1 modules
- Modify start_game, plan_next_actions, and game_over to invoke both strategies
- Implement dynamic strategy selection based on player prison counts
- Add A* pathfinding with danger weights for attacking players, including safe/dangerous position calculations and opponent avoidance
- Update traditional pathfinding for defenders and flag carriers in enemy territory
- Include binary updates to .DS_Store files (likely system artifacts)
2025-12-28 13:39:06 +08:00

1147 lines
50 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import random
import heapq
import math
from lib.game_engine import GameMap, run_game_server
import threading
from collections import defaultdict
# 全局世界模型初始化
world = GameMap(show_gap_in_msec=1.0)
lock = threading.Lock()
# 游戏状态管理
player_roles = {} # 玩家角色分配 {'player_name': 'attacker'/'defender'}
flag_assignments = {} # flag分配 {'player_name': (x, y)}
attack_tasks = {} # 攻击任务状态 {'player_name': 'attacking'/'returning'/'hiding'}
defender_positions = {} # 防守位置分配 {'player_name': (x, y)}
def start_game(req):
"""游戏开始初始化"""
global player_roles, flag_assignments, attack_tasks, defender_positions
world.init(req)
print("Start Game!!")
# 重置所有全局状态
player_roles = {}
flag_assignments = {}
attack_tasks = {}
defender_positions = {}
print(f"Game Started! Side: {'Left' if world.is_on_left(list(world.my_team_target)[0]) else 'Right'}")
def a_star(start, goal, dangerous_positions=None, hard_obstacles=None, opponents=None):
"""
基于权重的寻路算法,区分软限制和硬限制
危险位置软限制权重按距离opponent远近递增
对手位置和障碍物:硬限制(完全不能通过)
"""
if start == goal:
return [start]
# 硬限制:墙壁、对手位置等完全不能通过的位置
hard_blocked = set(world.walls)
if hard_obstacles:
hard_blocked.update(hard_obstacles)
# 软限制危险位置权重按距离opponent远近递增
dangerous_set = set(dangerous_positions) if dangerous_positions else set()
# 获取opponents位置列表用于距离计算
opponent_positions = []
if opponents:
opponent_positions = [(o["posX"], o["posY"]) if isinstance(o, dict) else o for o in opponents]
elif hard_obstacles:
opponent_positions = list(hard_obstacles)
# 4个方向移动上下左右
directions = [(0, -1), (0, 1), (-1, 0), (1, 0)] # 上, 下, 左, 右
def is_valid(pos):
"""检查位置是否有效(不在在硬限制区域)"""
x, y = pos
return (0 <= x < world.width and 0 <= y < world.height and
pos not in hard_blocked)
def get_move_cost(pos):
"""获取移动到某位置的代价危险区域权重按十字型阶梯递增最小100"""
if pos in dangerous_set and opponent_positions:
# 计算到最近opponent的距离
min_distance_to_opponent = min(
calculate_manhattan_distance(pos, opp_pos)
for opp_pos in opponent_positions
)
# 十字型阶梯权重以对手为中心每层距离对应一个权重阶梯最小100
if min_distance_to_opponent == 0:
return 50000 # 对手位置50000
elif min_distance_to_opponent == 1:
return 10000 # 距离110000
elif min_distance_to_opponent == 2:
return 5000 # 距离25000
elif min_distance_to_opponent == 3:
return 2000 # 距离32000
elif min_distance_to_opponent == 4:
return 1000 # 距离41000
elif min_distance_to_opponent == 5:
return 500 # 距离5500
elif min_distance_to_opponent == 6:
return 300 # 距离6300
else:
return 100 # 距离7+最小100
elif pos in dangerous_set:
return 100 # 默认危险位置最小权重100
if is_in_my_territory(pos):
return 1
else:
return 5
# return 1 # 正常代价
def get_manhattan_distance(pos1, pos2):
"""计算曼哈顿距离作为启发式函数"""
return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])
# A*搜索主循环
open_set = []
heapq.heappush(open_set, (0, start)) # (f_score, pos)
came_from = {}
g_score = {start: 0}
f_score = {start: get_manhattan_distance(start, goal)}
closed_set = set()
while open_set:
current_f, current_pos = heapq.heappop(open_set)
if current_pos in closed_set:
continue
closed_set.add(current_pos)
if current_pos == goal:
# 重构路径
path = []
pos = current_pos
while pos in came_from:
path.append(pos)
pos = came_from[pos]
path.append(start)
return path[::-1]
# 探索所有方向
for direction in directions:
next_pos = (current_pos[0] + direction[0], current_pos[1] + direction[1])
if is_valid(next_pos) and next_pos not in closed_set:
# 计算移动到下一位置的代价
move_cost = get_move_cost(next_pos)
tentative_g = g_score[current_pos] + move_cost
if next_pos not in g_score or tentative_g < g_score[next_pos]:
came_from[next_pos] = current_pos
g_score[next_pos] = tentative_g
f_score[next_pos] = tentative_g + get_manhattan_distance(next_pos, goal)
heapq.heappush(open_set, (f_score[next_pos], next_pos))
return [] # 无路径
def calculate_manhattan_distance(pos1, pos2):
"""计算曼哈顿距离"""
return abs(pos1[0] - pos2[0]) + abs(pos1[1] - pos2[1])
def is_in_my_territory(pos):
"""检查位置是否在我方领域"""
my_targets = list(world.list_targets(mine=True))
if not my_targets:
return True
my_side_is_left = world.is_on_left(my_targets[0])
return world.is_on_left(pos) == my_side_is_left
def is_in_enemy_territory(pos):
"""检查位置是否在敌方营地(敌方半场)"""
return not is_in_my_territory(pos)
def get_midline_x():
"""获取地图中线X坐标独立函数避免重复计算"""
return world.width // 2
def is_about_to_cross_midline(current_pos, next_pos):
"""判断玩家是否即将越过中线(从己方半场进入敌方半场)"""
# 获取我方半场标识(增加异常处理,防止崩溃)
my_targets = list(world.list_targets(mine=True))
if not my_targets:
return False
my_side_is_left = world.is_on_left(my_targets[0])
midline_x = get_midline_x()
# 我方在左侧当前x < 中线下一步x >= 中线 → 即将越界
if my_side_is_left:
current_in_my_territory = current_pos[0] < midline_x
next_in_enemy_territory = next_pos[0] >= midline_x
# 我方在右侧当前x > 中线下一步x <= 中线 → 即将越界
else:
current_in_my_territory = current_pos[0] > midline_x
next_in_enemy_territory = next_pos[0] <= midline_x
return current_in_my_territory and next_in_enemy_territory
def has_enemy_opposite(current_pos, next_pos, opponents, offset=1):
"""
【核心修改】仅检测中线对面「贴着中线」的位置是否有敌方玩家
贴着中线定义敌方玩家的X坐标与中线X坐标的差值的绝对值 ≤ 1即紧邻中线
同时保留Y轴小幅偏移可调整offset确保精准检测
"""
# 前置校验敌方玩家列表为空时直接返回False
if not opponents:
return False
# 1. 获取中线X坐标定义「贴着中线」的判定条件
midline_x = get_midline_x()
STICK_TO_MIDLINE_THRESHOLD = 1 # 贴着中线X坐标与中线差值≤1
# 2. 即将越界位置的Y轴判定基准
target_y = next_pos[1]
# 遍历敌方玩家(增加数据有效性判断)
for opponent in opponents:
# 校验敌方玩家数据完整性防止KeyError
if not all(key in opponent for key in ["name", "posX", "posY"]):
continue
opp_pos = (opponent["posX"], opponent["posY"])
# 【关键判定1】敌方玩家必须「贴着中线」X坐标紧邻中线
if abs(opp_pos[0] - midline_x) > STICK_TO_MIDLINE_THRESHOLD:
continue # 非贴着中线的敌方,直接跳过
# 【关键判定2】Y轴在允许偏移范围内与即将越界位置的Y轴接近
if abs(opp_pos[1] - target_y) > offset:
continue
# 【关键判定3】敌方玩家在敌方半场与玩家即将进入的区域一致中线两侧
if is_in_my_territory(opp_pos):
continue
# 【关键判定4】敌方玩家与越界位置在中线两侧严格对面
opp_side_left = opp_pos[0] < midline_x
next_side_left = next_pos[0] < midline_x
if opp_side_left == next_side_left:
continue # 同一侧,非对面
# 满足所有条件(贴着中线+Y轴接近+敌方半场+中线两侧),判定为有风险
print(f"[越界安全检测] 发现中线对面贴着中线的敌方玩家 {opponent['name']} 位置 {opp_pos},禁止越界")
return True
return False
# 寻找敌方营地内的安全躲避位置
def find_safe_hide_position_in_enemy_territory(player_pos, opponents, enemy_flags):
"""
在敌方营地内寻找安全躲避位置(远离所有敌方玩家,优先靠近目标旗帜)
:param player_pos: 玩家当前位置
:param opponents: 敌方玩家列表
:param enemy_flags: 敌方旗帜列表
:return: 安全躲避位置 / None
"""
if not opponents:
return player_pos # 无敌方时,当前位置即为安全
# 定义安全距离与所有敌方玩家的曼哈顿距离≥3格
SAFE_DISTANCE = 3
enemy_territory_positions = []
opponent_positions = [(o["posX"], o["posY"]) for o in opponents if all(key in o for key in ["posX", "posY"])]
# 遍历敌方营地内所有有效位置(非墙壁)
for x in range(world.width):
for y in range(world.height):
pos = (x, y)
# 仅保留敌方营地+非墙壁位置
if is_in_enemy_territory(pos) and pos not in world.walls:
# 检查是否与所有敌方玩家保持安全距离
is_safe = True
for opp_pos in opponent_positions:
if calculate_manhattan_distance(pos, opp_pos) < SAFE_DISTANCE:
is_safe = False
break
if is_safe:
enemy_territory_positions.append(pos)
if not enemy_territory_positions:
print("[躲避逻辑] 敌方营地无安全位置,优先向中线撤退")
# 无安全位置时,返回靠近中线的位置
midline_x = get_midline_x()
retreat_pos = min(
[(x, y) for x in range(world.width) for y in range(world.height) if
is_in_enemy_territory((x, y)) and (x, y) not in world.walls],
key=lambda p: abs(p[0] - midline_x)
)
return retreat_pos
# 优先选择靠近目标旗帜的安全位置(为后续拿旗做准备)
if enemy_flags:
target_flag_positions = [(f["posX"], f["posY"]) for f in enemy_flags]
# 按「距离最近目标旗帜」排序安全位置
enemy_territory_positions.sort(
key=lambda p: min(calculate_manhattan_distance(p, flag_pos) for flag_pos in target_flag_positions)
)
else:
# 无旗帜时,按「距离当前玩家最近」排序
enemy_territory_positions.sort(
key=lambda p: calculate_manhattan_distance(p, player_pos)
)
# 返回最优安全躲避位置
best_hide_pos = enemy_territory_positions[0]
print(f"[躲避逻辑] 找到最优安全躲避位置: {best_hide_pos}(远离敌方,靠近目标旗帜)")
return best_hide_pos
# 判断是否需要躲避敌方(进攻者在敌方营地时)
def need_hide_from_enemy(player_pos, opponents):
"""
判断进攻者在敌方营地时是否需要躲避敌方
:param player_pos: 玩家当前位置
:param opponents: 敌方玩家列表
:return: True=需要躲避(敌方过近), False=无需躲避(安全状态)
"""
if not opponents or not is_in_enemy_territory(player_pos):
return False # 无敌方 / 不在敌方营地,无需躲避
# 危险距离:与任意敌方玩家曼哈顿距离<3格判定为需要躲避
DANGER_DISTANCE = 3
opponent_positions = [(o["posX"], o["posY"]) for o in opponents if all(key in o for key in ["posX", "posY"])]
for opp_pos in opponent_positions:
if calculate_manhattan_distance(player_pos, opp_pos) < DANGER_DISTANCE:
print(f"[躲避逻辑] 检测到敌方玩家过近(距离<{DANGER_DISTANCE}格),需要优先躲避")
return True
return False
def calculate_safe_and_dangerous_positions(my_pos, opponents):
"""
计算安全位置和危险位置
安全位置己方player能在任意对方player之前提前一轮到达的位置
己方半场的位置都是安全位置
其余位置都是危险位置(包括周围一格范围)
"""
safe_positions = set()
base_dangerous_positions = set()
print(f"正在计算从位置 {my_pos} 出发的安全/危险位置...")
# 遍历地图上所有有效位置
for x in range(world.width):
for y in range(world.height):
pos = (x, y)
# 跳过墙壁
if pos in world.walls:
continue
# 己方半场的位置都是安全位置
if is_in_my_territory(pos):
safe_positions.add(pos)
continue
# 计算己方到达该位置的距离
my_distance = calculate_manhattan_distance(my_pos, pos)
# 检查是否能比所有对手提前一轮到达
is_safe = True
for opponent in opponents:
opponent_pos = (opponent["posX"], opponent["posY"])
opponent_distance = calculate_manhattan_distance(opponent_pos, pos)
# 如果对手能同时或更快到达,则不安全
if opponent_distance <= my_distance:
is_safe = False
break
if is_safe:
safe_positions.add(pos)
else:
base_dangerous_positions.add(pos)
# 将危险位置范围扩大一格
expanded_dangerous_positions = set(base_dangerous_positions)
for dangerous_pos in base_dangerous_positions:
x, y = dangerous_pos
# 检查周围8个方向的位置
for dx in [-1, 0, 1]:
for dy in [-1, 0, 1]:
if dx == 0 and dy == 0:
continue # 跳过自身位置
neighbor_pos = (x + dx, y + dy)
nx, ny = neighbor_pos
# 检查边界和墙壁
if (0 <= nx < world.width and 0 <= ny < world.height and
neighbor_pos not in world.walls and
not is_in_my_territory(neighbor_pos)): # 己方半场仍然安全
expanded_dangerous_positions.add(neighbor_pos)
# 从安全位置中移除被扩展为危险的位置(除了己方半场)
final_safe_positions = set()
for pos in safe_positions:
if is_in_my_territory(pos) or pos not in expanded_dangerous_positions:
final_safe_positions.add(pos)
print(
f"计算完成:基础危险位置 {len(base_dangerous_positions)} 个,扩展后危险位置 {len(expanded_dangerous_positions)}")
print(f"最终安全位置 {len(final_safe_positions)}")
return final_safe_positions, expanded_dangerous_positions
def find_closest_safe_position_to_target(my_pos, target_pos, safe_positions):
"""
找到距离目标最近的安全位置
这个位置必须能通过安全路径到达
"""
if not safe_positions:
return None
print(f"寻找从 {my_pos} 到目标 {target_pos} 的最近安全位置...")
# 如果目标本身就是安全位置,直接返回
if target_pos in safe_positions:
print(f"目标位置 {target_pos} 本身就是安全位置")
return target_pos
# 按距离目标的远近排序所有安全位置
safe_positions_list = list(safe_positions)
safe_positions_list.sort(key=lambda pos: calculate_manhattan_distance(pos, target_pos))
# 找到第一个能通过安全路径到达的安全位置
for safe_pos in safe_positions_list:
# 检查从当前位置到这个安全位置的路径是否只经过安全位置
path = a_star(my_pos, safe_pos, [])
if path:
# 检查路径上的所有位置是否都是安全的
path_is_safe = all(pos in safe_positions for pos in path)
if path_is_safe:
print(f"找到最近安全位置: {safe_pos}, 距离目标 {calculate_manhattan_distance(safe_pos, target_pos)}")
return safe_pos
print("未找到可通过安全路径到达的安全位置")
return None
def assign_player_roles(my_players, enemy_flags, my_flags, full_defense_mode=False):
"""
动态分配玩家角色(攻击者/防守者)
至少保持1个防守者其余为攻击者
当full_defense_mode=True时所有玩家都转为防守者
"""
global player_roles
total_players = len(my_players)
if total_players == 0:
return
# 完全防守模式:所有玩家都转为防守者
if full_defense_mode:
for p in my_players:
player_roles[p["name"]] = "defender"
print("进入完全防守模式,所有玩家均转为防守者")
return
# 计算需要的防守者数量
min_defenders = 1
enemy_threats = len([f for f in my_flags if f["canPickup"]])
needed_defenders = max(min_defenders, min(enemy_threats, total_players // 2))
# 清理已不存在的玩家角色
current_players = {p["name"] for p in my_players}
player_roles = {name: role for name, role in player_roles.items()
if name in current_players}
# 为新玩家分配角色
current_defenders = [name for name, role in player_roles.items()
if role == "defender"]
current_attackers = [name for name, role in player_roles.items()
if role == "attacker"]
unassigned = [p["name"] for p in my_players if p["name"] not in player_roles]
# 如果防守者不足,从攻击者中转换
while len(current_defenders) < needed_defenders and current_attackers:
# 选择最适合防守的攻击者(距离我方目标最近的)
my_targets = list(world.list_targets(mine=True))
if my_targets:
best_candidate = min(current_attackers,
key=lambda name: min(calculate_manhattan_distance(
(next(p for p in my_players if p["name"] == name)["posX"],
next(p for p in my_players if p["name"] == name)["posY"]), target)
for target in my_targets))
player_roles[best_candidate] = "defender"
current_defenders.append(best_candidate)
current_attackers.remove(best_candidate)
# 分配未分配的玩家
for name in unassigned:
if len(current_defenders) < needed_defenders:
player_roles[name] = "defender"
current_defenders.append(name)
else:
player_roles[name] = "attacker"
current_attackers.append(name)
def assign_defender_positions(defenders, my_players, my_flags, opponents, full_defense_mode=False):
"""
为防守者分配防守位置 - 重构版本
每轮前往距离任意opponent最近的flag绝对不能到对方半场
当full_defense_mode=True且防守者数量≥剩余旗帜数时所有人员分开站在旗帜上不动
"""
global defender_positions
if not defenders:
return
my_targets = list(world.list_targets(mine=True))
if not my_targets:
return
# 清空之前的分配
defender_positions = {}
# 只考虑我方半场的flags绝对不能到对方半场
my_territory_flags = []
for flag in my_flags:
flag_pos = (flag["posX"], flag["posY"])
if is_in_my_territory(flag_pos) and flag["canPickup"]:
my_territory_flags.append(flag)
print(f"我方半场可防守的flags: {[(f['posX'], f['posY']) for f in my_territory_flags]}")
# 完全防守模式特殊处理
if full_defense_mode and len(defenders) >= len(my_territory_flags) and my_territory_flags:
print("完全防守模式:所有防守者分开站在旗帜上不动防守")
# 为每个旗帜分配一个防守者
assigned_flags = set()
for i, defender_name in enumerate(defenders):
# 循环分配旗帜
flag_index = i % len(my_territory_flags)
flag = my_territory_flags[flag_index]
flag_pos = (flag["posX"], flag["posY"])
# 确保每个旗帜只分配一个防守者
if flag_pos not in assigned_flags:
defender_positions[defender_name] = flag_pos
assigned_flags.add(flag_pos)
print(f"defender {defender_name} 被分配驻守flag {flag_pos}")
else:
# 如果旗帜已被分配,分配到目标区域巡逻
target = my_targets[i % len(my_targets)]
defender_positions[defender_name] = target
print(f"defender {defender_name} 被分配到巡逻点 {target} (旗帜已被驻守)")
return
if not my_territory_flags:
# 如果没有需要防守的flagsdefenders在我方target区域巡逻
print("没有需要防守的flagsdefenders在我方领域巡逻")
for i, defender_name in enumerate(defenders):
target = my_targets[i % len(my_targets)]
defender_positions[defender_name] = target
print(f"defender {defender_name} 被分配到巡逻点 {target}")
return
# 为每个opponent找到距离它最近的我方flag只检测不持有己方flag的opponent
threatened_flags = []
for opponent in opponents:
opponent_pos = (opponent["posX"], opponent["posY"])
# 检查opponent是否持有我方flag
if opponent.get("hasFlag", False):
print(f"opponent {opponent['name']} 持有我方flag跳过威胁检测")
continue
if my_territory_flags:
# 找到距离这个opponent最近的我方flag
closest_flag = min(my_territory_flags,
key=lambda f: calculate_manhattan_distance(
(f["posX"], f["posY"]), opponent_pos))
closest_flag_pos = (closest_flag["posX"], closest_flag["posY"])
distance = calculate_manhattan_distance(closest_flag_pos, opponent_pos)
# 检查opponent是否在我方半场
is_opponent_in_my_territory = is_in_my_territory(opponent_pos)
# 在己方半场的opponent优先级为0最高优先级否则按距离排序
priority = 0 if is_opponent_in_my_territory else distance
threatened_flags.append({
'flag_pos': closest_flag_pos,
'opponent': opponent,
'distance': distance,
'priority': priority,
'in_my_territory': is_opponent_in_my_territory
})
territory_status = "己方半场" if is_opponent_in_my_territory else "敌方半场"
print(
f"opponent {opponent['name']}{opponent_pos} ({territory_status}) 威胁flag {closest_flag_pos}, 距离: {distance}, 优先级: {priority}")
# 按威胁优先级排序(距离最近的威胁优先处理)
threatened_flags.sort(key=lambda x: x['priority'])
# 为威胁最大的flags分配defenders考虑时间优势
assigned_flags = set()
for i, defender_name in enumerate(defenders):
if i < len(threatened_flags):
threat = threatened_flags[i]
new_flag_pos = threat['flag_pos']
# 获取defender当前位置
defender = next((p for p in my_players if p["name"] == defender_name), None)
if not defender:
continue
defender_pos = (defender["posX"], defender["posY"])
# 检查是否应该切换到新的flag
should_switch = True
# 如果defender之前已有目标检查是否值得切换
if defender_name in defender_positions:
current_target = defender_positions[defender_name]
# 计算defender到当前目标和新目标的距离
distance_to_current = calculate_manhattan_distance(defender_pos, current_target)
distance_to_new = calculate_manhattan_distance(defender_pos, new_flag_pos)
# 计算opponent到新flag的距离
opponent_to_new = threat['distance']
# 只有当defender能在opponent前到达新flag且新威胁更紧急时才切换
if distance_to_new >= opponent_to_new:
should_switch = False
print(
f"defender {defender_name} 保持当前目标 {current_target},因为无法及时赶到新威胁 {new_flag_pos}")
print(
f" 当前距离: {distance_to_current}, 新目标距离: {distance_to_new}, opponent距离: {opponent_to_new}")
else:
print(f"defender {defender_name} 切换到新威胁 {new_flag_pos},能够及时拦截")
print(f" defender距离: {distance_to_new}, opponent距离: {opponent_to_new}")
if should_switch:
# 避免多个defender保护同一个flag除非defender太多
if new_flag_pos not in assigned_flags or len(assigned_flags) >= len(my_territory_flags):
defender_positions[defender_name] = new_flag_pos
assigned_flags.add(new_flag_pos)
print(
f"defender {defender_name} 被分配保护flag {new_flag_pos}, 对抗opponent {threat['opponent']['name']} (距离: {threat['distance']})")
else:
# 如果新flag已有人守护保持当前目标或选择其他flag
if defender_name in defender_positions:
print(
f"defender {defender_name} 保持当前目标 {defender_positions[defender_name]}新flag已有其他defender")
else:
remaining_flags = [f for f in my_territory_flags
if (f["posX"], f["posY"]) not in assigned_flags]
if remaining_flags:
flag = remaining_flags[0]
flag_pos = (flag["posX"], flag["posY"])
defender_positions[defender_name] = flag_pos
assigned_flags.add(flag_pos)
print(f"defender {defender_name} 被分配到备选flag {flag_pos}")
else:
target = my_targets[i % len(my_targets)]
defender_positions[defender_name] = target
print(f"defender {defender_name} 被分配到巡逻点 {target}")
else:
defender_positions[defender_name] = current_target
else:
# 多余的defenders在target区域巡逻
target = my_targets[i % len(my_targets)]
defender_positions[defender_name] = target
print(f"额外defender {defender_name} 被分配到巡逻点 {target}")
print(f"最终defender分配: {defender_positions}")
def assign_attacker_targets(attackers, my_players, enemy_flags, opponents):
"""为攻击者分配攻击目标"""
global flag_assignments, attack_tasks
if not attackers or not enemy_flags:
return
my_targets = list(world.list_targets(mine=True))
if not my_targets:
return
# 获取opponent位置集合
opponent_positions_set = {(o["posX"], o["posY"]) for o in opponents}
# 清理无效的分配
active_attackers = set(attackers)
flag_assignments = {name: pos for name, pos in flag_assignments.items()
if name in active_attackers}
attack_tasks = {name: status for name, status in attack_tasks.items()
if name in active_attackers}
# 检查并更新现有的flag分配
attackers_need_reassign = []
for attacker_name in attackers:
player = next((p for p in my_players if p["name"] == attacker_name), None)
if not player:
continue
# 如果玩家带flag更新为返回状态
if player["hasFlag"]:
attack_tasks[attacker_name] = "returning"
if attacker_name in flag_assignments:
del flag_assignments[attacker_name]
continue
# 检查当前分配的flag上是否站着opponent
if attacker_name in flag_assignments:
assigned_flag = flag_assignments[attacker_name]
if assigned_flag in opponent_positions_set:
print(f"攻击者 {attacker_name} 的目标flag {assigned_flag} 上站着opponent需要重新分配")
del flag_assignments[attacker_name]
if attacker_name in attack_tasks:
del attack_tasks[attacker_name]
attackers_need_reassign.append(attacker_name)
else:
print(f"攻击者 {attacker_name} 的目标flag {assigned_flag} 安全")
else:
# 没有分配目标的攻击者
attackers_need_reassign.append(attacker_name)
# 为需要重新分配的攻击者选择new flag
assigned_flags = set(flag_assignments.values())
available_flags = [(f["posX"], f["posY"]) for f in enemy_flags
if (f["posX"], f["posY"]) not in assigned_flags]
# 筛选出没有opponent站着的flag
safe_flags = [flag_pos for flag_pos in available_flags
if flag_pos not in opponent_positions_set]
print(f"可用flags: {available_flags}")
print(f"安全flags无opponent: {safe_flags}")
for attacker_name in attackers_need_reassign:
player = next((p for p in my_players if p["name"] == attacker_name), None)
if not player:
continue
player_pos = (player["posX"], player["posY"])
if safe_flags:
# 选择寻路分数最低的安全flag
print(f"为攻击者 {attacker_name}{player_pos} 从安全flags中选择目标...")
# 计算当前玩家的危险位置
safe_positions, dangerous_positions = calculate_safe_and_dangerous_positions(player_pos, opponents)
best_flag = None
best_score = float('inf')
for flag_pos in safe_flags:
# 使用权重寻路计算到达该flag的实际代价
opponent_positions_list = [(o["posX"], o["posY"]) for o in opponents]
path = a_star(player_pos, flag_pos, dangerous_positions, opponent_positions_list)
if path:
# 计算路径的总权重代价
total_cost = 0
dangerous_set = set(dangerous_positions)
for pos in path:
if pos in dangerous_set:
total_cost += 10000 # 危险位置代价
else:
total_cost += 1 # 正常位置代价
print(f" 安全Flag {flag_pos}: 路径长度={len(path)}, 总代价={total_cost}")
if total_cost < best_score:
best_score = total_cost
best_flag = flag_pos
else:
print(f" 安全Flag {flag_pos}: 无法到达")
if best_flag:
flag_assignments[attacker_name] = best_flag
attack_tasks[attacker_name] = "attacking"
safe_flags.remove(best_flag)
print(f" 选择最优安全目标: {best_flag}, 代价: {best_score}")
else:
print(f" WARNING: 攻击者 {attacker_name} 无法到达任何安全flag")
else:
# 没有安全flag尝试选择有opponent占据的flag
print(f"攻击者 {attacker_name} 没有安全flag可选尝试选择有opponent占据的flag")
occupied_flags = [flag_pos for flag_pos in available_flags
if flag_pos in opponent_positions_set]
if occupied_flags:
# 检查当前是否已经在攻击某个被占据的flag
current_assignment = flag_assignments.get(attacker_name)
if current_assignment in occupied_flags:
# 检查距离如果靠近3格opponent还没让开就换目标
distance_to_flag = calculate_manhattan_distance(player_pos, current_assignment)
if distance_to_flag <= 3:
print(
f" 攻击者 {attacker_name} 距离被占据flag {current_assignment}{distance_to_flag}opponent未让开寻找替代目标")
# 寻找其他可选的flag
other_flags = [f for f in occupied_flags if f != current_assignment]
other_safe_flags = [f for f in available_flags if
f not in opponent_positions_set and f != current_assignment]
if other_safe_flags:
# 优先选择安全flag
best_alternative = min(other_safe_flags,
key=lambda f: calculate_manhattan_distance(player_pos, f))
flag_assignments[attacker_name] = best_alternative
attack_tasks[attacker_name] = "attacking"
print(f" 切换到安全备选flag: {best_alternative}")
elif other_flags:
# 选择其他被占据的flag
best_alternative = min(other_flags,
key=lambda f: calculate_manhattan_distance(player_pos, f))
flag_assignments[attacker_name] = best_alternative
attack_tasks[attacker_name] = "attacking"
print(f" 切换到其他被占据flag: {best_alternative}")
else:
# 没有其他选择,返回己方半场
print(f" 无其他flag可选攻击者 {attacker_name} 返回己方半场")
_assign_retreat_position(attacker_name, player_pos, opponent_positions_set)
else:
print(
f" 攻击者 {attacker_name} 继续攻击被占据flag {current_assignment}, 距离: {distance_to_flag}")
else:
# 为新攻击者分配被占据的flag
best_occupied_flag = min(occupied_flags,
key=lambda f: calculate_manhattan_distance(player_pos, f))
flag_assignments[attacker_name] = best_occupied_flag
attack_tasks[attacker_name] = "attacking"
print(f" 分配被占据flag: {best_occupied_flag}")
else:
# 没有任何可用flag向己方半场撤退
print(f"攻击者 {attacker_name} 没有任何flag可选向己方半场撤退")
_assign_retreat_position(attacker_name, player_pos, opponent_positions_set)
def _assign_retreat_position(attacker_name, player_pos, opponent_positions_set):
"""为攻击者分配撤退位置"""
# 在己方半场选择一个随机安全位置
my_territory_positions = []
for x in range(world.width):
for y in range(world.height):
pos = (x, y)
if (is_in_my_territory(pos) and pos not in world.walls and
pos not in opponent_positions_set):
my_territory_positions.append(pos)
if my_territory_positions:
random_target = random.choice(my_territory_positions)
flag_assignments[attacker_name] = random_target
attack_tasks[attacker_name] = "retreating"
print(f" 攻击者 {attacker_name} 选择随机撤退位置: {random_target}")
else:
print(f" WARNING: 攻击者 {attacker_name} 无法找到安全撤退位置")
def plan_next_actions(req):
"""主要的游戏策略规划函数(优化:进攻者在敌方营地优先躲敌,再拿旗)"""
if not world.update(req):
return {}
print(f"\n=== 游戏回合 {req.get('time', 'unknown')} ===")
global player_roles, flag_assignments, attack_tasks, defender_positions
# 获取游戏状态
my_players = world.list_players(mine=True, inPrison=False, hasFlag=None)
opponents = world.list_players(mine=False, inPrison=False, hasFlag=None)
enemy_flags = world.list_flags(mine=False, canPickup=True)
my_flags = world.list_flags(mine=True, canPickup=True)
my_targets = list(world.list_targets(mine=True))
# 计算双方剩余旗帜数量
my_remaining_flags = len(my_flags)
enemy_remaining_flags = len(enemy_flags)
print(f"我方剩余旗帜: {my_remaining_flags}, 敌方剩余旗帜: {enemy_remaining_flags}")
# 检查是否需要进入完全防守模式
full_defense_mode = False
if my_remaining_flags <= 3 and enemy_remaining_flags <= my_remaining_flags:
full_defense_mode = True
print(
f"触发完全防守模式: 我方剩余旗帜={my_remaining_flags} ≤3 且 敌方剩余旗帜={enemy_remaining_flags} ≤ 我方旗帜数")
print(f"我方玩家: {[(p['name'], p['posX'], p['posY'], p['hasFlag']) for p in my_players]}")
print(f"敌方玩家: {[(p['name'], p['posX'], p['posY']) for p in opponents]}")
print(f"敌方flags: {[(f['posX'], f['posY']) for f in enemy_flags]}")
print(f"我方targets: {my_targets}")
if not my_players or not my_targets:
print("没有我方玩家或目标,返回空移动")
return {}
# 1. 动态角色分配(支持完全防守模式)
assign_player_roles(my_players, enemy_flags, my_flags, full_defense_mode)
print(f"角色分配: {player_roles}")
# 2. 获取当前角色分配
defenders = [name for name, role in player_roles.items() if role == "defender"]
attackers = [name for name, role in player_roles.items() if role == "attacker"]
print(f"防守者: {defenders}, 攻击者: {attackers}")
# 3. 分配防守和攻击任务(防守任务支持完全防守模式)
assign_defender_positions(defenders, my_players, my_flags, opponents, full_defense_mode)
# 如果不是完全防守模式才分配攻击任务
if not full_defense_mode:
assign_attacker_targets(attackers, my_players, enemy_flags, opponents)
else:
# 完全防守模式下清空攻击任务
flag_assignments = {}
attack_tasks = {}
print("完全防守模式:清空所有攻击任务")
print(f"防守位置: {defender_positions}")
print(f"攻击目标: {flag_assignments}")
print(f"攻击任务状态: {attack_tasks}")
# 4. 为每个玩家计算移动
player_moves = {}
for player in my_players:
player_name = player["name"]
current_pos = (player["posX"], player["posY"])
is_attacker = player_name in attackers
is_in_enemy = is_in_enemy_territory(current_pos)
has_flag = player["hasFlag"]
print(f"\n--- 处理玩家 {player_name} 位置 {current_pos} ---")
print(
f"[{player_name}] 身份: {'攻击者' if is_attacker else '防守者'}, 所在区域: {'敌方营地' if is_in_enemy else '己方营地'}, 持有旗帜: {has_flag}")
# 【最高优先级】强制攻击逻辑如果距离opponent仅1格且opponent在我方半场无论什么状态都必须向opponent移动
adjacent_opponent = None
closest_distance = float('inf')
for opponent in opponents:
opponent_pos = (opponent["posX"], opponent["posY"])
distance = calculate_manhattan_distance(current_pos, opponent_pos)
# 检查是否相邻距离为1且opponent在我方半场
if distance == 1 and is_in_my_territory(opponent_pos):
# 如果有多个相邻入侵者,选择最近的一个
if distance < closest_distance:
closest_distance = distance
adjacent_opponent = opponent
print(f"{player_name} 发现距离{distance}格的入侵者 {opponent['name']} 在我方半场 {opponent_pos}")
# 强制攻击入侵者 - 无论玩家当前状态是什么攻击者、防守者、带flag等
if adjacent_opponent:
opponent_pos = (adjacent_opponent["posX"], adjacent_opponent["posY"])
move = world.get_direction(current_pos, opponent_pos)
if move:
player_moves[player_name] = move
print(
f"【强制攻击】{player_name} 无论当前状态如何,必须攻击入侵者 {adjacent_opponent['name']}: {current_pos} -> {opponent_pos}, 方向: {move}")
continue
else:
print(f"ERROR: {player_name} 无法向入侵者移动 - 这不应该发生!")
# 【优化核心逻辑】进攻者在敌方营地:优先判断是否需要躲避敌方
if is_attacker and is_in_enemy and not has_flag:
if need_hide_from_enemy(current_pos, opponents):
# 1. 需要躲避:更新任务状态为躲避
attack_tasks[player_name] = "hiding"
# 2. 寻找敌方营地内的安全躲避位置
hide_pos = find_safe_hide_position_in_enemy_territory(current_pos, opponents, enemy_flags)
if hide_pos:
# 3. 规划前往躲避位置的路径
opponent_positions = [(o["posX"], o["posY"]) for o in opponents]
path = a_star(current_pos, hide_pos, [], opponent_positions)
if path and len(path) > 1:
next_pos = path[1]
move = world.get_direction(current_pos, next_pos)
if move:
player_moves[player_name] = move
print(
f"【优先躲避】{player_name} 前往安全躲避位置: {current_pos} -> {next_pos} (最终目标: {hide_pos})")
continue
print(f"【躲避失败】{player_name} 无法找到躲避路径,执行原拿旗逻辑")
# 确定目标位置(原有逻辑,躲避优先级高于此)
target_pos = None
task_type = "unknown"
if player["hasFlag"]:
# 带flag回家 - 选择加权分值最低的target
print(f"{player_name} 正在选择最优回程目标(基于加权分值)...")
# 计算当前位置的危险区域(用于加权计算)
safe_positions, dangerous_positions = calculate_safe_and_dangerous_positions(current_pos, opponents)
dangerous_set = set(dangerous_positions)
best_target = None
lowest_weighted_score = float('inf')
for target in my_targets:
# 计算到每个target的加权路径
opponent_positions = [(o["posX"], o["posY"]) for o in opponents]
path = a_star(current_pos, target, dangerous_positions, opponent_positions)
if path:
# 计算路径的总加权代价
total_weighted_cost = 0
for pos in path:
if pos in dangerous_set:
total_weighted_cost += 10000 # 危险位置代价
else:
total_weighted_cost += 1 # 正常位置代价
print(f" Target {target}: 路径长度={len(path)}, 加权代价={total_weighted_cost}")
if total_weighted_cost < lowest_weighted_score:
lowest_weighted_score = total_weighted_cost
best_target = target
else:
print(f" Target {target}: 无法到达")
if best_target:
target_pos = best_target
print(f"{player_name} 选择最优回程目标: {target_pos}, 加权代价: {lowest_weighted_score}")
else:
# 如果所有target都无法到达选择第一个作为备选
target_pos = my_targets[0]
print(f"WARNING: {player_name} 无法到达任何target使用默认目标: {target_pos}")
task_type = "returning_flag"
elif player_name in defender_positions:
# 防守者前往防守位置
target_pos = defender_positions[player_name]
task_type = "defending"
# 完全防守模式且已到达旗帜位置则不移动
if (full_defense_mode and
len(defenders) >= len(my_flags) and
current_pos == target_pos):
print(f"{player_name} 已在防守位置 {target_pos},完全防守模式下不移动")
continue
elif player_name in flag_assignments:
# 攻击者前往目标flag仅当无需躲避时执行
target_pos = flag_assignments[player_name]
task_type = "attacking"
print(f"{player_name} 任务类型: {task_type}, 目标位置: {target_pos}")
if not target_pos or current_pos == target_pos:
print(f"{player_name} 没有目标或已到达目标,跳过")
continue
# 检查是否在我方领域
in_my_territory = is_in_my_territory(current_pos)
print(f"{player_name} 是否在我方领域: {in_my_territory}")
# 对于attacker使用权重寻路策略软性约束危险位置硬性约束对手位置
if task_type == "attacking":
print(f"攻击者 {player_name} 使用权重寻路策略")
# 计算当前的危险位置
safe_positions, dangerous_positions = calculate_safe_and_dangerous_positions(current_pos, opponents)
print(f"{player_name} 检测到 {len(dangerous_positions)} 个危险位置")
# 对手位置作为硬限制(完全不能通过)
opponent_positions = [(o["posX"], o["posY"]) for o in opponents]
print(f"{player_name} 硬限制对手位置: {opponent_positions}")
# 直接向目标flag移动危险位置软限制对手位置硬限制
path = a_star(current_pos, target_pos, dangerous_positions, opponent_positions, opponents)
print(f"{player_name} 权重寻路路径: {path}")
if not path:
# 如果权重寻路失败,使用传统寻路作为备选
print(f"{player_name} 权重寻路失败,尝试传统寻路")
path = world.route_to(current_pos, target_pos, opponent_positions)
else:
# 对于defender或带flag的玩家使用传统策略
print(f"{player_name} 使用传统寻路策略")
extra_obstacles = []
if not in_my_territory:
# 在敌方领域时避免与对手直接冲突
extra_obstacles = [(o["posX"], o["posY"]) for o in opponents]
print(f"{player_name} 在敌方领域,避开 {len(extra_obstacles)} 个对手位置")
# 使用A*算法寻路
path = a_star(current_pos, target_pos, extra_obstacles)
# 如果A*失败,使用原始寻路算法作为备选
if not path:
print(f"{player_name} A*寻路失败尝试原始BFS算法")
path = world.route_to(current_pos, target_pos, extra_obstacles)
print(f"{player_name} 最终路径: {path}")
if len(path) > 1:
next_pos = path[1]
current_pos = (player["posX"], player["posY"])
player_name = player["name"]
# ===================== 嵌入越界安全检测无bug =====================
try:
# 1. 检测是否即将越界
about_to_cross = is_about_to_cross_midline(current_pos, next_pos)
if about_to_cross:
# 2. 检测中线对面贴着中线的敌方
enemy_opposite = has_enemy_opposite(current_pos, next_pos, opponents)
if enemy_opposite:
# 3. 有敌方则禁止移动,原地待命
print(f"[{player_name}] 即将越界遇中线对面贴着中线的敌方,停止移动(安全防护)")
continue
except Exception as e:
# 捕获异常不影响程序运行
print(f"[{player_name}] 越界检测临时异常:{e},继续正常移动")
# =====================================================================
# 原有移动逻辑
move = world.get_direction(current_pos, next_pos)
print(f"{player_name} 下一步: {current_pos} -> {next_pos}, 方向: {move}")
if move:
player_moves[player_name] = move
else:
print(f"WARNING: {player_name} 无法确定移动方向!")
else:
print(f"WARNING: {player_name} 没有找到有效路径!")
print(f"最终移动指令: {player_moves}")
print("=" * 50)
return player_moves
def game_over(req):
print("Game Over!")
world.show(force=True)
async def main():
import sys
if len(sys.argv) != 2:
print(f"Usage: python3 {sys.argv[0]} <port>")
print(f"Example: python3 {sys.argv[0]} 8080")
sys.exit(1)
port = int(sys.argv[1])
print(f"AI backend running on port {port} ...")
try:
await run_game_server(port, start_game, plan_next_actions, game_over)
except Exception as e:
print(f"Server Stopped: {e}")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())