qlpycon/state.py
2026-01-09 13:17:30 +01:00

231 lines
7.9 KiB
Python

#!/usr/bin/env python3
"""
Game state management for QLPyCon
Tracks server info, players, and teams
"""
import re
import logging
from config import TEAM_MODES, TEAM_MAP, MAX_RECENT_EVENTS
logger = logging.getLogger('state')
class ServerInfo:
"""Tracks current server information"""
def __init__(self):
self.hostname = 'Unknown'
self.map = 'Unknown'
self.gametype = 'Unknown'
self.timelimit = '0'
self.fraglimit = '0'
self.roundlimit = '0'
self.capturelimit = '0'
self.maxclients = '0'
self.curclients = '0'
self.red_score = 0
self.red_rounds = 0
self.blue_score = 0
self.blue_rounds = 0
self.players = {} # Changed to dict: {name: {'score': str, 'ping': str}}
self.last_update = 0
self.warmup = False
self.dead_players = {}
self.round_end_time = None
self.match_time = 0
def is_team_mode(self):
"""Check if current gametype is a team mode"""
return self.gametype in TEAM_MODES
def reset_round_scores(self):
"""Reset round scores (for new matches)"""
self.red_rounds = 0
self.blue_rounds = 0
self.dead_players.clear()
def update_from_cvar(self, cvar_name, value):
"""Update server info from a cvar response"""
# Normalize cvar name to lowercase for case-insensitive matching
cvar_lower = cvar_name.lower()
mapping = {
'qlx_serverbrandname': 'hostname',
'g_factorytitle': 'gametype',
'mapname': 'map',
'timelimit': 'timelimit',
'fraglimit': 'fraglimit',
'roundlimit': 'roundlimit',
'capturelimit': 'capturelimit',
'sv_maxclients': 'maxclients'
}
attr = mapping.get(cvar_lower)
if attr:
# Only strip color codes for non-hostname fields
if attr != 'hostname':
value = re.sub(r'\^\d', '', value)
setattr(self, attr, value)
logger.info(f'Updated {attr}: {value}')
return True
return False
class PlayerTracker:
"""Tracks player teams and information"""
def __init__(self, server_info):
self.server_info = server_info
self.player_teams = {}
def update_team(self, name, team):
"""Update player team. Team can be int or string"""
# Convert numeric team to string
if isinstance(team, int):
team = TEAM_MAP.get(team, 'FREE')
if team not in ['RED', 'BLUE', 'FREE', 'SPECTATOR']:
team = 'FREE'
# Store both original name and color-stripped version
self.player_teams[name] = team
clean_name = re.sub(r'\^\d', '', name)
if clean_name != name:
self.player_teams[clean_name] = team
logger.debug(f'Updated team for {name} (clean: {clean_name}): {team}')
def get_team(self, name):
"""Get player's team"""
return self.player_teams.get(name)
def add_player(self, name, score='0', ping='0'):
"""Add player to server's player dict if not exists"""
# Use original name with color codes as key
if name not in self.server_info.players:
self.server_info.players[name] = {
'score': score,
'ping': ping
}
logger.debug(f'Added player: {name}')
def get_players_by_team(self):
"""Get players organized by team"""
teams = {'RED': [], 'BLUE': [], 'SPECTATOR': [], 'FREE': []}
for name in self.server_info.players.keys():
team = self.player_teams.get(name, 'FREE')
if team not in teams:
team = 'FREE'
teams[team].append(name)
return teams
def remove_player(self, name):
"""Remove player from tracking"""
clean_name = re.sub(r'\^\d', '', name)
# Try to remove by exact name first
removed = self.server_info.players.pop(name, None)
# If not found, try to find by clean name
if not removed:
for player_name in list(self.server_info.players.keys()):
if re.sub(r'\^\d', '', player_name) == clean_name:
removed = self.server_info.players.pop(player_name)
logger.info(f'Removed player: {player_name} (matched clean name: {clean_name})')
break
else:
logger.info(f'Removed player: {name}')
if not removed:
logger.warning(f'Player not found for removal: {name} (clean: {clean_name})')
# Remove from team tracking
self.player_teams.pop(name, None)
self.player_teams.pop(clean_name, None)
def rename_player(self, old_name, new_name):
"""Rename a player while maintaining their team and score"""
old_clean = re.sub(r'\^\d', '', old_name)
# Get current team (try both names)
team = self.player_teams.get(old_name) or self.player_teams.get(old_clean, 'SPECTATOR')
# Find player data by old name
player_data = self.server_info.players.pop(old_name, None)
# If not found by exact name, try clean name
if not player_data:
for player_name in list(self.server_info.players.keys()):
if re.sub(r'\^\d', '', player_name) == old_clean:
player_data = self.server_info.players.pop(player_name)
break
# Add player with new name
if player_data:
self.server_info.players[new_name] = player_data
# Remove old team entries
self.player_teams.pop(old_name, None)
self.player_teams.pop(old_clean, None)
# Add new team entries with color codes preserved
self.update_team(new_name, team)
logger.debug(f'Renamed player: {old_name} -> {new_name} (team: {team})')
def update_score(self, name, delta):
"""Update player's score by delta (+1 for kill, -1 for death/suicide)"""
# Try exact name first (O(1) lookup)
if name in self.server_info.players:
current_score = int(self.server_info.players[name].get('score', 0))
self.server_info.players[name]['score'] = str(current_score + delta)
logger.debug(f"Score update: {name} {delta:+d} -> {self.server_info.players[name]['score']}")
return
# Fallback: search by clean name (rare case)
clean_name = re.sub(r'\^\d', '', name)
for player_name, player_data in self.server_info.players.items():
if re.sub(r'\^\d', '', player_name) == clean_name:
current_score = int(player_data.get('score', 0))
player_data['score'] = str(current_score + delta)
logger.debug(f"Score update: {player_name} {delta:+d} -> {player_data['score']}")
return
logger.warning(f"Could not update score for {name} - player not found")
class EventDeduplicator:
"""Prevents duplicate kill/death events"""
def __init__(self):
self.recent_events = []
def is_duplicate(self, event_type, time_val, killer_name, victim_name):
"""Check if this kill/death event is a duplicate"""
if event_type not in ('PLAYER_DEATH', 'PLAYER_KILL'):
return False
signature = f"KILL:{time_val}:{killer_name}:{victim_name}"
if signature in self.recent_events:
logger.debug(f'Duplicate event: {signature}')
return True
# Add to recent events
self.recent_events.append(signature)
if len(self.recent_events) > MAX_RECENT_EVENTS:
self.recent_events.pop(0)
return False
class GameState:
"""Main game state container"""
def __init__(self):
self.server_info = ServerInfo()
self.player_tracker = PlayerTracker(self.server_info)
self.event_deduplicator = EventDeduplicator()
self.pending_background_commands = set()
self.status_buffer = []