qlpycon/state.py
2025-12-29 08:41:07 +01:00

199 lines
6.6 KiB
Python

#!/usr/bin/env python3
"""
Game state management for QLPyCon
Tracks server info, players, and teams
"""
import re
import logging
from config import TEAM_MODES, TEAM_MAP, MAX_RECENT_EVENTS
logger = logging.getLogger('state')
class ServerInfo:
"""Tracks current server information"""
def __init__(self):
self.hostname = 'Unknown'
self.map = 'Unknown'
self.gametype = 'Unknown'
self.timelimit = '0'
self.fraglimit = '0'
self.roundlimit = '0'
self.capturelimit = '0'
self.maxclients = '0'
self.curclients = '0'
self.red_score = 0
self.blue_score = 0
self.players = []
self.last_update = 0
self.warmup = False
def is_team_mode(self):
"""Check if current gametype is a team mode"""
return self.gametype in TEAM_MODES
def update_from_cvar(self, cvar_name, value):
"""Update server info from a cvar response"""
# Normalize cvar name to lowercase for case-insensitive matching
cvar_lower = cvar_name.lower()
mapping = {
'qlx_serverbrandname': 'hostname',
'g_factorytitle': 'gametype',
'mapname': 'map',
'timelimit': 'timelimit',
'fraglimit': 'fraglimit',
'roundlimit': 'roundlimit',
'capturelimit': 'capturelimit',
'sv_maxclients': 'maxclients'
}
attr = mapping.get(cvar_lower)
if attr:
# Only strip color codes for non-hostname fields
if attr != 'hostname':
value = re.sub(r'\^\d', '', value)
setattr(self, attr, value)
logger.info(f'Updated {attr}: {value}')
return True
return False
class PlayerTracker:
"""Tracks player teams and information"""
def __init__(self, server_info):
self.server_info = server_info
self.player_teams = {}
def update_team(self, name, team):
"""Update player team. Team can be int or string"""
# Convert numeric team to string
if isinstance(team, int):
team = TEAM_MAP.get(team, 'FREE')
if team not in ['RED', 'BLUE', 'FREE', 'SPECTATOR']:
team = 'FREE'
# Store both original name and color-stripped version
self.player_teams[name] = team
clean_name = re.sub(r'\^\d', '', name)
if clean_name != name:
self.player_teams[clean_name] = team
logger.debug(f'Updated team for {name} (clean: {clean_name}): {team}')
def get_team(self, name):
"""Get player's team"""
return self.player_teams.get(name)
def add_player(self, name, score='0', ping='0'):
"""Add player to server's player list if not exists"""
clean_name = re.sub(r'\^\d', '', name)
# Check if player already exists (by either name or clean name)
for existing in self.server_info.players:
existing_clean = re.sub(r'\^\d', '', existing['name'])
if existing['name'] == name or existing_clean == clean_name:
return # Already exists
self.server_info.players.append({
'name': name,
'score': score,
'ping': ping
})
def get_players_by_team(self):
"""Get players organized by team"""
teams = {'RED': [], 'BLUE': [], 'SPECTATOR': [], 'FREE': []}
for player in self.server_info.players:
name = player['name']
team = self.player_teams.get(name, 'FREE')
if team not in teams:
team = 'FREE'
teams[team].append(name)
return teams
def remove_player(self, name):
"""Remove player from tracking"""
clean_name = re.sub(r'\^\d', '', name)
# Count before removal
before_count = len(self.server_info.players)
# Remove from player list - check both original and clean names
self.server_info.players = [
p for p in self.server_info.players
if p['name'] != name and re.sub(r'\^\d', '', p['name']) != clean_name
]
# Log if anything was actually removed
after_count = len(self.server_info.players)
if before_count != after_count:
logger.info(f'Removed player: {name} (clean: {clean_name}) - {before_count} -> {after_count}')
else:
logger.warning(f'Player not found for removal: {name} (clean: {clean_name})')
# Remove from team tracking - both versions
self.player_teams.pop(name, None)
self.player_teams.pop(clean_name, None)
def rename_player(self, old_name, new_name):
"""Rename a player while maintaining their team"""
old_clean = re.sub(r'\^\d', '', old_name)
# Get current team (try both names)
team = self.player_teams.get(old_name) or self.player_teams.get(old_clean, 'SPECTATOR')
# Find and update player in server list
for player in self.server_info.players:
if player['name'] == old_name or re.sub(r'\^\d', '', player['name']) == old_clean:
player['name'] = new_name
break
# Remove old team entries
self.player_teams.pop(old_name, None)
self.player_teams.pop(old_clean, None)
# Add new team entries with color codes preserved
self.update_team(new_name, team)
logger.debug(f'Renamed player: {old_name} -> {new_name} (team: {team})')
class EventDeduplicator:
"""Prevents duplicate kill/death events"""
def __init__(self):
self.recent_events = []
def is_duplicate(self, event_type, time_val, killer_name, victim_name):
"""Check if this kill/death event is a duplicate"""
if event_type not in ('PLAYER_DEATH', 'PLAYER_KILL'):
return False
signature = f"KILL:{time_val}:{killer_name}:{victim_name}"
if signature in self.recent_events:
logger.debug(f'Duplicate event: {signature}')
return True
# Add to recent events
self.recent_events.append(signature)
if len(self.recent_events) > MAX_RECENT_EVENTS:
self.recent_events.pop(0)
return False
class GameState:
"""Main game state container"""
def __init__(self):
self.server_info = ServerInfo()
self.player_tracker = PlayerTracker(self.server_info)
self.event_deduplicator = EventDeduplicator()
self.pending_background_commands = set()
self.status_buffer = []