Introduce a new `CTF/backend/mypolicy` Python module that implements a comprehensive game policy for the capture‑the‑flag bot. The file defines a global world model, thread‑safe state handling, and a `Map` class that builds a grid representation of the game map, generates traversable edges, and provides BFS‑based `guideance` and `length` methods for pathfinding and distance calculation. This policy enables the bot to make informed movement decisions based on safe zones, walls, allies, and enemies. Additionally, macOS `.DS_Store` placeholder files were added to the repository.
213 lines
7.9 KiB
Python
213 lines
7.9 KiB
Python
import asyncio
|
|
import random
|
|
from lib.game_engine import GameMap, run_game_server
|
|
import threading
|
|
import collections
|
|
|
|
# 1. Initialize the global world model
|
|
world = GameMap(show_gap_in_msec=10.0)
|
|
lock = threading.Lock()
|
|
last_updated_time = -1
|
|
update_threshold = 10
|
|
player_to_flag_assign = {}
|
|
my_side_is_left = None
|
|
class Map:
|
|
def __init__(self):
|
|
self.width = world.width
|
|
self.height = world.height
|
|
self.grid = [0] * (self.width * self.height)
|
|
self.edge = [[] for _ in range(self.width * self.height)]
|
|
self.in_safe_zone = None
|
|
|
|
def convert_pos_to_index(self, x, y):
|
|
return y * self.width + x
|
|
|
|
def update(self,posx,posy):
|
|
self.width = world.width
|
|
self.height = world.height
|
|
self.edge = [[] for _ in range(self.width * self.height)]
|
|
self.grid = [0] * (self.width * self.height)
|
|
walls = world.walls
|
|
for wall in walls:
|
|
x, y = wall
|
|
idx = self.convert_pos_to_index(x, y)
|
|
self.grid[idx] = 1
|
|
self.in_safe_zone = world.is_on_left((posx,posy)) == my_side_is_left
|
|
enemy_players = world.list_players(mine=False, inPrison=False, hasFlag=None)
|
|
ally_players = world.list_players(mine=True, inPrison=False, hasFlag=None)
|
|
my_pos = self.convert_pos_to_index(posx, posy)
|
|
for ally in ally_players:
|
|
x, y = ally["posX"], ally["posY"]
|
|
idx = self.convert_pos_to_index(x, y)
|
|
self.grid[idx] = 3
|
|
for enemy in enemy_players:
|
|
x, y = enemy["posX"], enemy["posY"]
|
|
idx = self.convert_pos_to_index(x, y)
|
|
self.grid[idx] = 2
|
|
for dx, dy in [(0, 1), (0, -1), (1, 0), (-1, 0)]:
|
|
nx, ny = x + dx, y + dy
|
|
self.grid[self.convert_pos_to_index(nx,ny)] = 2
|
|
for y in range(self.height):
|
|
for x in range(self.width):
|
|
idx = self.convert_pos_to_index(x, y)
|
|
if self.in_safe_zone:
|
|
if (self.grid[idx] in (1,)) and idx != my_pos:
|
|
continue
|
|
else:
|
|
if (self.grid[idx] in (1, 2)) and idx != my_pos:
|
|
continue
|
|
for dx, dy in [(0, 1), (0, -1), (1, 0), (-1, 0)]:
|
|
nx, ny = x + dx, y + dy
|
|
if 0 <= nx < self.width and 0 <= ny < self.height:
|
|
n_idx = self.convert_pos_to_index(nx, ny)
|
|
if self.in_safe_zone:
|
|
if self.grid[n_idx] not in (1, 2):
|
|
self.edge[idx].append(n_idx)
|
|
else:
|
|
if self.grid[n_idx] not in (1,):
|
|
self.edge[idx].append(n_idx)
|
|
def guideance(self, posx_start, posy_start, posx_end, posy_end):
|
|
self.update(posx_start,posy_start)
|
|
src_idx = self.convert_pos_to_index(posx_start, posy_start)
|
|
dst_idx = self.convert_pos_to_index(posx_end, posy_end)
|
|
n = self.width * self.height
|
|
dist = [float('inf')] * n
|
|
prev = [None] * n
|
|
dist[src_idx] = 0
|
|
queue = collections.deque([src_idx])
|
|
while queue:
|
|
u = queue.popleft()
|
|
if u == dst_idx:
|
|
break
|
|
for v in self.edge[u]:
|
|
if dist[v] == float('inf'):
|
|
dist[v] = dist[u] + 1
|
|
prev[v] = u
|
|
queue.append(v)
|
|
|
|
# ---- 若不可达,返回空字符串 ----
|
|
if dist[dst_idx] == float('inf'):
|
|
return ""
|
|
# ---- 重建路径(逆序) ----
|
|
path = []
|
|
cur = dst_idx
|
|
while cur is not None:
|
|
path.append(cur)
|
|
cur = prev[cur]
|
|
path.reverse() # 现在是 [src, ..., dst]
|
|
if len(path) < 2:
|
|
return ""
|
|
# ---- 计算第一步坐标并返回方向 ----
|
|
next_idx = path[1]
|
|
next_x = next_idx % (self.width)
|
|
next_y = next_idx // (self.height)
|
|
return world.get_direction((posx_start, posy_start), (next_x, next_y))
|
|
def length(self, posx_start, posy_start, posx_end, posy_end):
|
|
self.update(posx_start,posy_start)
|
|
src_idx = self.convert_pos_to_index(posx_start, posy_start)
|
|
dst_idx = self.convert_pos_to_index(posx_end, posy_end)
|
|
n = self.width * self.height
|
|
dist = [float('inf')] * n # 最短距离,初始为无穷大
|
|
prev = [None] * n # 前驱节点,用于路径回溯
|
|
dist[src_idx] = 0
|
|
queue = collections.deque([src_idx])
|
|
while queue:
|
|
u = queue.popleft()
|
|
if u == dst_idx:
|
|
break
|
|
for v in self.edge[u]:
|
|
if dist[v] == float('inf'):
|
|
dist[v] = dist[u] + 1
|
|
prev[v] = u
|
|
queue.append(v)
|
|
|
|
# ---- 若不可达,返回空字符串 ----
|
|
if dist[dst_idx] == float('inf'):
|
|
return -1
|
|
return dist[dst_idx]
|
|
|
|
myMap = Map()
|
|
def start_game(req):
|
|
global player_to_flag_assign,my_side_is_left
|
|
world.init(req)
|
|
print("Start Game!!")
|
|
player_to_flag_assign = {}
|
|
print(f"Game Started! Side: {'Left' if world.is_on_left(list(world.my_team_target)[0]) else 'Right'}")
|
|
my_side_is_left = world.is_on_left(list(world.my_team_target)[0])
|
|
|
|
def plan_next_actions(req):
|
|
if not world.update(req):
|
|
return
|
|
global player_to_flag_assign,myMap,my_side_is_left
|
|
|
|
my_players = world.list_players(mine=True, inPrison=False, hasFlag=None)
|
|
opponents = world.list_players(mine=False, inPrison=False, hasFlag=None)
|
|
enemy_flags = world.list_flags(mine=False, canPickup=True)
|
|
|
|
my_targets = list(world.list_targets(mine=True))
|
|
|
|
active_player_names = {p["name"] for p in my_players if not p["hasFlag"]}
|
|
flags_list = []
|
|
for flags in opponents:
|
|
flags_list.append((flags["posX"],flags["posY"]))
|
|
player_to_flag_assign = {
|
|
name: pos for name, pos in player_to_flag_assign.items()
|
|
if name in active_player_names
|
|
}
|
|
if enemy_flags:
|
|
for p in my_players:
|
|
if p["name"] not in active_player_names:
|
|
continue
|
|
if p["name"] in player_to_flag_assign and player_to_flag_assign[p["name"]] != None and player_to_flag_assign[p["name"]] in flags_list:
|
|
continue
|
|
closest_flag = None
|
|
min_length = float('inf')
|
|
for f in enemy_flags:
|
|
temp = myMap.length(p["posX"],p["posY"],f["posX"],f["posY"])
|
|
if temp != -1 and temp < min_length:
|
|
min_length = temp
|
|
closest_flag = f
|
|
f = closest_flag if closest_flag is not None else random.choice(enemy_flags)
|
|
player_to_flag_assign[p["name"]] = (f["posX"], f["posY"])
|
|
|
|
# 3. Plan moves for each player
|
|
player_moves = {}
|
|
|
|
for p in my_players:
|
|
curr_pos = (p["posX"], p["posY"])
|
|
|
|
# Determine Target: Either the assigned flag or the home target
|
|
if p["hasFlag"]:
|
|
dest = my_targets[0]
|
|
elif p["name"] in player_to_flag_assign:
|
|
dest = player_to_flag_assign[p["name"]]
|
|
else:
|
|
continue
|
|
|
|
# Determine Obstacles: Avoid opponents if we are in enemy territory
|
|
is_safe = world.is_on_left(curr_pos) == my_side_is_left
|
|
|
|
player_moves[p["name"]] = myMap.guideance(p["posX"],p["posY"],dest[0],dest[1])
|
|
|
|
return player_moves
|
|
|
|
def game_over(req):
|
|
print("Game Over!")
|
|
world.show(force=True)
|
|
|
|
async def main():
|
|
import sys
|
|
if len(sys.argv) != 2:
|
|
print(f"Usage: python3 {sys.argv[0]} <port>")
|
|
print(f"Example: python3 {sys.argv[0]} 8080")
|
|
sys.exit(1)
|
|
port = int(sys.argv[1])
|
|
print(f"AI backend running on port {port} ...")
|
|
try:
|
|
await run_game_server(port, start_game, plan_next_actions, game_over)
|
|
except Exception as e:
|
|
print(f"Server Stopped: {e}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |