From a889898694ba0bec0e7141494ea9d8e871a3e5fa Mon Sep 17 00:00:00 2001 From: xbl Date: Tue, 23 Dec 2025 09:37:42 +0100 Subject: [PATCH] refactor --- .gitignore | 3 + README.md | 144 +++++++++++++ config.py | 101 +++++++++ formatter.py | 202 ++++++++++++++++++ main.py | 589 +++++++++++++++++++++++++++++++++++++++++++++++++++ network.py | 167 +++++++++++++++ parser.py | 247 +++++++++++++++++++++ state.py | 180 ++++++++++++++++ ui.py | 258 ++++++++++++++++++++++ 9 files changed, 1891 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 config.py create mode 100644 formatter.py create mode 100644 main.py create mode 100644 network.py create mode 100644 parser.py create mode 100644 state.py create mode 100644 ui.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cf8e9aa --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +venv3/ +*.log +__pycache__/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..caae500 --- /dev/null +++ b/README.md @@ -0,0 +1,144 @@ +# QLPyCon - Quake Live Python Console + +A modular, refactored terminal-based client for monitoring Quake Live servers via ZMQ. + +## Version 0.8.0 - Modular Architecture + +### Features + +- **Real-time game monitoring** - Watch kills, deaths, team switches, medals, and more +- **Server info display** - Shows hostname, map, gametype, limits, and player count +- **Team-aware chat** - Color-coded messages with team prefixes +- **Powerup tracking** - Formatted pickup and carrier kill messages +- **JSON event logging** - Capture all game events to file for analysis +- **Colorized output** - Quake color code support (^0-^7) + +### Module Structure + +``` +qlpycon/ +├── __init__.py # Package initialization +├── main.py # Entry point and main loop +├── config.py # Constants and configuration +├── state.py # Game state management (ServerInfo, PlayerTracker, etc) +├── network.py # ZMQ connections (RCON and stats stream) +├── parser.py # JSON event parsing +├── formatter.py # Message formatting and colorization +└── ui.py # Curses interface (windows, display, input) +``` + +### Installation + +```bash +# Requirements +pip install pyzmq + +# Run directly +python -m qlpycon.main --host tcp://127.0.0.1:27961 --password YOUR_PASSWORD +``` + +### Usage + +```bash +# Basic connection +python main.py --host tcp://SERVER_IP:PORT --password RCON_PASSWORD + +# Verbose mode (show all communications) +python main.py --host tcp://SERVER_IP:PORT --password PASS -v + +# Debug mode (detailed logging) +python main.py --host tcp://SERVER_IP:PORT --password PASS -vv + +# Capture all JSON events to file +python main.py --host tcp://SERVER_IP:PORT --password PASS --json events.log + +# Custom unknown events log +python main.py --unknown-log my_unknown.log +``` + +### Command Line Options + +- `--host` - ZMQ URI (default: tcp://127.0.0.1:27961) +- `--password` - RCON password +- `--identity` - Socket identity (random UUID by default) +- `-v, --verbose` - Increase verbosity (use -v for INFO, -vv for DEBUG) +- `--json FILE` - Log all JSON events to FILE +- `--unknown-log FILE` - Log unknown JSON events (default: unknown_events.log) + +### Architecture Overview + +#### State Management (`state.py`) +- **ServerInfo** - Tracks server configuration and metadata +- **PlayerTracker** - Manages player teams and information +- **EventDeduplicator** - Prevents duplicate kill/death events +- **GameState** - Main container for all state + +#### Network Layer (`network.py`) +- **RconConnection** - Handles DEALER socket for RCON commands +- **StatsConnection** - Handles SUB socket for game event stream + +#### Event Parsing (`parser.py`) +- **EventParser** - Parses JSON game events into formatted messages +- Modular handlers for each event type (deaths, medals, team switches, etc) + +#### Message Formatting (`formatter.py`) +- Color code handling (Quake's ^N system) +- Team prefix generation +- Timestamp logic +- Chat message formatting + +#### UI Layer (`ui.py`) +- **UIManager** - Manages all curses windows +- Three-panel layout: server info, output, input +- Threaded input queue for non-blocking commands +- Color rendering with curses + +### Key Improvements Over v0.7.0 + +1. **Modular Design** - Separated concerns into focused modules +2. **Clean Classes** - OOP design for state and connections +3. **Better Maintainability** - Each module has a single responsibility +4. **Easier Testing** - Components can be tested independently +5. **Clear Interfaces** - Well-defined APIs between modules +6. **Improved Comments** - Every module, class, and function documented + +### Event Types Supported + +- `PLAYER_SWITCHTEAM` - Team changes +- `PLAYER_DEATH` / `PLAYER_KILL` - Frag events (deduplicated) +- `PLAYER_MEDAL` - Medal awards +- `PLAYER_STATS` - End-game statistics with weapon accuracy +- `MATCH_STARTED` - Match initialization +- `MATCH_REPORT` - Final scores +- `PLAYER_CONNECT` / `PLAYER_DISCONNECT` - Connection events +- `ROUND_OVER` - Round completion + +### Color Codes + +Quake Live uses `^N` color codes where N is 0-7: +- `^0` - Black +- `^1` - Red +- `^2` - Green +- `^3` - Yellow +- `^4` - Blue +- `^5` - Cyan +- `^6` - Magenta +- `^7` - White (default) + +### Development + +To extend QLPyCon: + +1. **Add new event types** - Edit `parser.py`, add handler method +2. **Change UI layout** - Edit `ui.py`, modify window creation +3. **Add new commands** - Edit `main.py`, handle in main loop +4. **Modify formatting** - Edit `formatter.py`, adjust color/timestamp logic +5. **Add configuration** - Edit `config.py`, add constants + +### License + +MIT License - See original qlpycon.py for details + +### Credits + +Refactored from qlpycon.py v0.7.0 diff --git a/config.py b/config.py new file mode 100644 index 0000000..2a2ab4b --- /dev/null +++ b/config.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +""" +Configuration and constants for QLPyCon +""" + +VERSION = "0.8.0" + +# Network defaults +DEFAULT_HOST = 'tcp://127.0.0.1:27961' +POLL_TIMEOUT = 100 + +# UI dimensions +INFO_WINDOW_HEIGHT = 10 +INFO_WINDOW_Y = 2 +OUTPUT_WINDOW_Y = 12 +INPUT_WINDOW_HEIGHT = 1 + +# Event deduplication +MAX_RECENT_EVENTS = 10 + +# Team game modes +TEAM_MODES = [ + "Team Deathmatch", + "Clan Arena", + "Capture The Flag", + "One Flag CTF", + "Overload", + "Harvester", + "Freeze Tag" +] + +# Team mappings +TEAM_MAP = { + 0: 'FREE', + 1: 'RED', + 2: 'BLUE', + 3: 'SPECTATOR' +} + +TEAM_COLORS = { + 'RED': '^1(RED)^7', + 'BLUE': '^4(BLUE)^7', + 'FREE': '', + 'SPECTATOR': '^3(SPEC)^7' +} + +# Weapon names +WEAPON_NAMES = { + 'ROCKET': 'Rocket Launcher', + 'LIGHTNING': 'Lightning Gun', + 'RAILGUN': 'Railgun', + 'SHOTGUN': 'Shotgun', + 'GAUNTLET': 'Gauntlet', + 'GRENADE': 'Grenade Launcher', + 'PLASMA': 'Plasma Gun', + 'MACHINEGUN': 'Machine Gun' +} + +# Weapon names for kill messages +WEAPON_KILL_NAMES = { + 'ROCKET': 'the Rocket Launcher', + 'LIGHTNING': 'the Lightning Gun', + 'RAILGUN': 'the Railgun', + 'SHOTGUN': 'the Shotgun', + 'GAUNTLET': 'the Gauntlet', + 'GRENADE': 'the Grenade Launcher', + 'PLASMA': 'the Plasma Gun', + 'MACHINEGUN': 'the Machine Gun' +} + +# Death messages +DEATH_MESSAGES = { + 'FALLING': "%s%s ^7cratered.", + 'HURT': "%s%s ^7was in the wrong place.", + 'LAVA': "%s%s ^7does a backflip into the lava.", + 'WATER': "%s%s ^7sank like a rock.", + 'SLIME': "%s%s ^7melted.", + 'CRUSH': "%s%s ^7was crushed." +} + +# Powerup names and colors +POWERUP_COLORS = { + 'Quad Damage': '^5Quad Damage^7', + 'Battle Suit': '^3Battle Suit^7', + 'Regeneration': '^1Regeneration^7', + 'Haste': '^3Haste^7', + 'Invisibility': '^5Invisibility^7', + 'Flight': '^5Flight^7', + 'Medkit': '^1Medkit^7', + 'MegaHealth': '^4MegaHealth^7' +} + +# Curses color pairs +COLOR_PAIRS = { + 1: 1, # Red + 2: 2, # Green + 3: 3, # Yellow + 4: 4, # Blue + 5: 6, # Cyan (swapped) + 6: 5 # Magenta (swapped) +} diff --git a/formatter.py b/formatter.py new file mode 100644 index 0000000..37ed5ec --- /dev/null +++ b/formatter.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +""" +Message formatting and colorization for QLPyCon +Handles Quake color codes and team prefixes +""" + +import re +import time +from config import TEAM_COLORS + + +def strip_color_codes(text): + """Remove Quake color codes (^N) from text""" + return re.sub(r'\^\d', '', text) + + +def get_team_prefix(player_name, player_tracker): + """Get color-coded team prefix for a player""" + if not player_tracker.server_info.is_team_mode(): + return '' + + team = player_tracker.get_team(player_name) + if not team: + return '' + + return TEAM_COLORS.get(team, '') + + +def should_add_timestamp(message): + """Determine if a message should get a timestamp""" + # Skip status command output + skip_keywords = ['map:', 'num score', '---', 'bot', 'status'] + if any(kw in message for kw in skip_keywords): + # But allow "zmq RCON" lines (command echoes) + if 'zmq RCON' not in message: + return False + + # Skip very short messages or fragments + stripped = message.strip() + if len(stripped) <= 2: + return False + + # Skip messages with leading spaces (status fragments) + if message.startswith(' ') and len(stripped) < 50: + return False + + # Skip pure numbers + if stripped.isdigit(): + return False + + # Skip short single words + if len(stripped) < 20 and ' ' not in stripped: + return False + + # Skip IP addresses (xxx.xxx.xxx.xxx or xxx.xxx.xxx.xxx:port) + allowed_chars = set('0123456789.:') + if stripped.count('.') == 3 and all(c in allowed_chars for c in stripped): + return False + + # Skip messages starting with *** + if message.startswith('***'): + return False + + return True + + +def format_message(message, add_timestamp=True): + """ + Format a message for display + - Strips special characters + - Adds timestamp if appropriate + - Handles broadcast formatting + """ + # Clean up message + message = message.replace("\\n", "") + message = message.replace(chr(25), "") + + # Handle broadcast messages + attributes = 0 + if message[:10] == "broadcast:": + message = message[11:] + attributes = 1 # Bold + + # Handle print messages + if message[:7] == "print \"": + message = message[7:-2] + "\n" + + # Add timestamp if requested and appropriate + if add_timestamp and should_add_timestamp(message): + timestamp = time.strftime('%H:%M:%S') + message = f"^3[^7{timestamp}^3]^7 {message}" + + return message, attributes + + +def format_chat_message(message, player_tracker): + """ + Format chat messages with team prefixes and colors + Handles both regular chat (Name: msg) and team chat ((Name): msg or (Name) (Location): msg) + """ + # Strip special character + clean_msg = message.replace(chr(25), '') + + # Team chat with location: (PlayerName) (Location): message + # Location can have nested parens like (Lower Floor (Near Yellow Armour)) + if clean_msg.strip().startswith('(') and ')' in clean_msg: + # Extract player name (first parenthetical) + player_match = re.match(r'^(\([^)]+\))', clean_msg) + if not player_match: + return message + + player_part = player_match.group(1) + rest = clean_msg[len(player_part):].lstrip() + + # Check for location (another parenthetical) + if rest.startswith('('): + # Count parens to handle nesting + paren_count = 0 + location_end = -1 + for i, char in enumerate(rest): + if char == '(': + paren_count += 1 + elif char == ')': + paren_count -= 1 + if paren_count == 0: + location_end = i + 1 + break + + # Check if location ends with colon + if location_end > 0 and location_end < len(rest) and rest[location_end] == ':': + location_part = rest[:location_end] + message_part = rest[location_end + 1:] + + # Get team prefix + name_match = re.match(r'\(([^)]+)\)', player_part) + if name_match: + player_name = strip_color_codes(name_match.group(1).strip()) + team_prefix = get_team_prefix(player_name, player_tracker) + location_clean = strip_color_codes(location_part) + return f"{team_prefix}{player_part} ^3{location_clean}^7:^5{message_part}" + + # Team chat without location: (PlayerName): message + colon_match = re.match(r'^(\([^)]+\)):(\s*.*)', clean_msg) + if colon_match: + player_part = colon_match.group(1) + ':' + message_part = colon_match.group(2) + + name_match = re.match(r'\(([^)]+)\)', player_part) + if name_match: + player_name = strip_color_codes(name_match.group(1).strip()) + team_prefix = get_team_prefix(player_name, player_tracker) + return f"{team_prefix}{player_part}^5{message_part}\n" + + # Regular chat: PlayerName: message + parts = clean_msg.split(':', 1) + if len(parts) == 2: + player_name = strip_color_codes(parts[0].strip()) + team_prefix = get_team_prefix(player_name, player_tracker) + + # Preserve original color-coded name + original_parts = message.replace(chr(25), '').split(':', 1) + if len(original_parts) == 2: + return f"{team_prefix}{original_parts[0]}:^2{original_parts[1]}" + + return message + + +def format_powerup_message(message, player_tracker): + """ + Format powerup pickup and carrier kill messages + Returns formatted message or None if not a powerup message + """ + from config import POWERUP_COLORS + + if message.startswith("broadcast:"): + message = message[11:].strip() + + # Powerup pickup: "PlayerName got the PowerupName!" + pickup_match = re.match(r'^(.+?)\s+got the\s+(.+?)!', message) + if pickup_match: + player_name = pickup_match.group(1).strip() + powerup_name = pickup_match.group(2).strip() + + player_clean = strip_color_codes(player_name) + team_prefix = get_team_prefix(player_clean, player_tracker) + + colored_powerup = POWERUP_COLORS.get(powerup_name, f'^6{powerup_name}^7') + return f"{team_prefix}{player_name} ^7got the {colored_powerup}!\n" + + # Powerup carrier kill: "PlayerName killed the PowerupName carrier!" + carrier_match = re.match(r'^(.+?)\s+killed the\s+(.+?)\s+carrier!', message) + if carrier_match: + player_name = carrier_match.group(1).strip() + powerup_name = carrier_match.group(2).strip() + + player_clean = strip_color_codes(player_name) + team_prefix = get_team_prefix(player_clean, player_tracker) + + colored_powerup = POWERUP_COLORS.get(powerup_name, f'^6{powerup_name}^7') + return f"{team_prefix}{player_name} ^7killed the {colored_powerup} ^7carrier!\n" + + return None diff --git a/main.py b/main.py new file mode 100644 index 0000000..62b2025 --- /dev/null +++ b/main.py @@ -0,0 +1,589 @@ +#!/usr/bin/env python3 +""" +QLPyCon - Quake Live Python Console +Main entry point +""" + +import argparse +import uuid +import logging +import time +import re +import curses +import zmq +import signal +import sys + +from config import VERSION, DEFAULT_HOST, POLL_TIMEOUT +from state import GameState +from network import RconConnection, StatsConnection +from parser import EventParser +from formatter import format_message, format_chat_message, format_powerup_message +from ui import UIManager + +# Configure logging +logger = logging.getLogger('main') +logger.setLevel(logging.DEBUG) + +all_json_logger = logging.getLogger('all_json') +all_json_logger.setLevel(logging.DEBUG) + +unknown_json_logger = logging.getLogger('unknown_json') +unknown_json_logger.setLevel(logging.DEBUG) + + +def signal_handler(sig, frame): + """Handle Ctrl+C for immediate shutdown""" + # Don't call curses.endwin() here - let curses.wrapper handle it + sys.exit(0) + + +def parse_cvar_response(message, game_state, ui): + """ + Parse server cvar responses and update state + Returns True if message should be suppressed from display + """ + # Suppress command echo lines + if message.startswith('zmq RCON command') and 'from' in message: + suppress_cmds = ['status', 'roundlimit', 'qlx_serverBrandName', 'g_factoryTitle', + 'mapname', 'timelimit', 'fraglimit', 'capturelimit', 'sv_maxclients'] + if any(f': {cmd}' in message for cmd in suppress_cmds): + return True + + # Parse cvar responses (format: "cvar_name" is:"value" default:...) + cvar_match = re.search(r'"([^"]+)"\s+is:"([^"]*)"', message) + if cvar_match: + cvar_name = cvar_match.group(1) + value = cvar_match.group(2) + + if game_state.server_info.update_from_cvar(cvar_name, value): + ui.update_server_info(game_state) + return True + + return False + + +def handle_stats_connection(message, rcon, ui, game_state): + """ + Handle stats connection info extraction + Returns (stats_port, stats_password) or (None, None) + """ + stats_port = None + stats_password = None + + # Extract stats port + if 'net_port' in message and ' is:' in message and '"net_port"' in message: + match = re.search(r' is:"([^"]+)"', message) + if match: + port_str = match.group(1).strip() + digit_match = re.search(r'(\d+)', port_str) + if digit_match: + stats_port = digit_match.group(1) + logger.info(f'Got stats port: {stats_port}') + + # Extract stats password + if 'zmq_stats_password' in message and ' is:' in message and '"zmq_stats_password"' in message: + match = re.search(r' is:"([^"]+)"', message) + if match: + password_str = match.group(1) + password_str = re.sub(r'\^\d', '', password_str) # Strip color codes + stats_password = password_str.strip() + logger.info(f'Got stats password: {stats_password}') + + return stats_port, stats_password + +def parse_player_events(message, game_state, ui): + """ + Parse connect, disconnect, kick, and rename messages + Returns True if message should be suppressed + """ + try: + # Strip color codes for matching + from formatter import strip_color_codes + clean_msg = strip_color_codes(message) + + # Match connects: "NAME connected" + connect_match = re.match(r'^(.+?)\s+connected(?:\s+with Steam ID)?', clean_msg) + if connect_match: + player_name = message.split(' connected')[0].strip() # Keep color codes + if game_state.server_info.is_team_mode(): + game_state.player_tracker.update_team(player_name, 'SPECTATOR') + game_state.player_tracker.add_player(player_name) + ui.update_server_info(game_state) + logger.debug(f'Player connected: {player_name}') + return False + + # Match disconnects: "NAME disconnected", "NAME was kicked" + disconnect_patterns = [ + r'^(.+?)\s+disconnected', + r'^(.+?)\s+was kicked', + ] + + for pattern in disconnect_patterns: + match = re.match(pattern, clean_msg) + if match: + player_name_clean = match.group(1).strip() + # Try to find the original name with color codes + original_match = re.match(pattern, message) + player_name = original_match.group(1).strip() if original_match else player_name_clean + + game_state.player_tracker.remove_player(player_name) + game_state.player_tracker.remove_player(player_name_clean) # Try both + ui.update_server_info(game_state) + logger.debug(f'Player disconnected: {player_name}') + return False + + # Match renames: "OldName renamed to NewName" + rename_match = re.match(r'^(.+?)\s+renamed to\s+(.+?)$', clean_msg) + if rename_match: + old_name_clean = rename_match.group(1).strip() + new_name_clean = rename_match.group(2).strip() + + # Get names with color codes + original_match = re.match(r'^(.+?)\s+renamed to\s+(.+?)$', message) + old_name = original_match.group(1).strip() if original_match else old_name_clean + new_name = original_match.group(2).strip() if original_match else new_name_clean + + game_state.player_tracker.rename_player(old_name, new_name) + ui.update_server_info(game_state) + logger.debug(f'Player renamed: {old_name} -> {new_name}') + return False + + except Exception as e: + logger.error(f'Error in parse_player_events: {e}') + + return False + +def main_loop(screen): + return False + """Main application loop""" + + # Setup signal handler for Ctrl+C + signal.signal(signal.SIGINT, signal_handler) + + # Parse arguments + parser = argparse.ArgumentParser(description='Verbose QuakeLive server statistics') + parser.add_argument('--host', default=DEFAULT_HOST, help=f'ZMQ URI to connect to. Defaults to {DEFAULT_HOST}') + parser.add_argument('--password', required=False, help='RCON password') + parser.add_argument('--identity', default=uuid.uuid1().hex, help='Socket identity (random UUID by default)') + parser.add_argument('-v', '--verbose', action='count', default=0, help='Increase verbosity (-v INFO, -vv DEBUG)') + parser.add_argument('--unknown-log', default='unknown_events.log', help='File to log unknown JSON events') + parser.add_argument('-j', '--json', dest='json_log', default=None, help='File to log all JSON events') + args = parser.parse_args() + + # Set logging level + if args.verbose == 0: + logger.setLevel(logging.WARNING) + elif args.verbose == 1: + logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.DEBUG) + + # Setup file logging for unknown events + unknown_handler = logging.FileHandler(args.unknown_log, mode='a') + unknown_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S') + unknown_handler.setFormatter(unknown_formatter) + unknown_json_logger.addHandler(unknown_handler) + unknown_json_logger.propagate = False + + # Initialize components + ui = UIManager(screen, args.host) + game_state = GameState() + + # Setup logging to output window + log_handler = ui.setup_logging() + logger.addHandler(log_handler) + + # Setup input queue + input_queue = ui.setup_input_queue() + + # Display startup messages + ui.print_message(f"*** QL pyCon Version {VERSION} starting ***\n") + ui.print_message(f"zmq python bindings {zmq.__version__}, libzmq version {zmq.zmq_version()}\n") + + # Initialize network connections + rcon = RconConnection(args.host, args.password, args.identity) + rcon.connect() + + stats_conn = None + stats_port = None + stats_password = None + stats_check_counter = 0 + + # Shutdown flag + shutdown = False + + # Setup JSON logging if requested + json_logger = None + if args.json_log: + json_handler = logging.FileHandler(args.json_log, mode='a') + json_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S') + json_handler.setFormatter(json_formatter) + all_json_logger.addHandler(json_handler) + all_json_logger.propagate = False + json_logger = all_json_logger + + # Create event parser + event_parser = EventParser(game_state, json_logger, unknown_json_logger) + + # Main event loop + while not shutdown: + # Poll RCON socket + event = rcon.poll(POLL_TIMEOUT) + + # Check monitor for connection events + monitor_event = rcon.check_monitor() + if monitor_event and monitor_event[0] == zmq.EVENT_CONNECTED: + ui.print_message("Connected to server\n") + rcon.send_command(b'register') + logger.info('Registration message sent') + + ui.print_message("Requesting connection info...\n") + rcon.send_command(b'zmq_stats_password') + rcon.send_command(b'net_port') + + # Handle user input + while not input_queue.empty(): + command = input_queue.get() + logger.info(f'Sending command: {repr(command.strip())}') + + # Display command with timestamp + timestamp = time.strftime('%H:%M:%S') + ui.print_message(f"^5[^7{timestamp}^5] >>> {command.strip()}^7\n") + + rcon.send_command(command) + + # Poll stats stream if connected + if stats_conn and stats_conn.connected: + stats_check_counter += 1 + + if stats_check_counter % 100 == 0: + logger.debug(f'Stats polling active (check #{stats_check_counter // 100})') + + stats_msg = stats_conn.recv_message() + if stats_msg: + logger.info(f'Stats event received ({len(stats_msg)} bytes)') + + # Parse game event + parsed = event_parser.parse_event(stats_msg) + if parsed: + # Format with timestamp before displaying + formatted_msg, attributes = format_message(parsed) + ui.print_message(formatted_msg) + ui.update_server_info(game_state) + + # Process RCON messages + if event > 0: + logger.debug('Socket has data available') + msg_count = 0 + + while True: + message = rcon.recv_message() + if message is None: + if msg_count > 0: + logger.debug(f'Read {msg_count} message(s)') + break + + msg_count += 1 + + if len(message) == 0: + logger.debug('Received empty message (keepalive)') + continue + + logger.debug(f'Received message ({len(message)} bytes): {repr(message[:100])}') + + # Try to parse as cvar response + if parse_cvar_response(message, game_state, ui): + logger.debug('Suppressed cvar response') + continue + + # Check for player connect/disconnect/rename events + parse_player_events(message, game_state, ui) + + # Check for stats connection info + port, password = handle_stats_connection(message, rcon, ui, game_state) + if port: + stats_port = port + if password: + stats_password = password + + # Connect to stats if we have both credentials + if stats_port and stats_password and stats_conn is None: + try: + ui.print_message("Connecting to stats stream...\n") + host_ip = args.host.split('//')[1].split(':')[0] + + stats_conn = StatsConnection(host_ip, stats_port, stats_password) + stats_conn.connect() + + ui.print_message("Stats stream connected - ready for game events\n") + + # Request initial server info + logger.info('Sending initial server info queries') + rcon.send_command(b'qlx_serverBrandName') + rcon.send_command(b'g_factoryTitle') + rcon.send_command(b'mapname') + rcon.send_command(b'timelimit') + rcon.send_command(b'fraglimit') + rcon.send_command(b'roundlimit') + rcon.send_command(b'capturelimit') + rcon.send_command(b'sv_maxclients') + + if args.json_log: + ui.print_message(f"*** JSON capture enabled: {args.json_log} ***\n") + + except Exception as e: + timestamp = time.strftime('%H:%M:%S') + ui.print_message(f"^1[^7{timestamp}^1] Error: Stats connection failed: {e}^7\n") + logger.error(f'Stats connection failed: {e}') + + # Try to parse as game event + parsed_event = event_parser.parse_event(message) + if parsed_event: + ui.print_message(parsed_event) + continue + + # Check if it looks like JSON but wasn't parsed + stripped = message.strip() + if stripped and stripped[0] in ('{', '['): + logger.debug('Unparsed JSON event') + continue + + # Try powerup message formatting + powerup_msg = format_powerup_message(message, game_state.player_tracker) + if powerup_msg: + ui.print_message(powerup_msg) + continue + + # Filter bot debug messages in default mode + is_bot_debug = (' entered ' in message and + any(x in message for x in [' seek ', ' battle ', ' chase', ' fight'])) + if is_bot_debug and args.verbose == 0: + logger.debug(f'Filtered bot debug: {message[:50]}') + continue + + # Check if it's a chat message + if ':' in message and not message.startswith(('print', 'broadcast', 'zmq')): + message = format_chat_message(message, game_state.player_tracker) + + # Format and display message + formatted_msg, attributes = format_message(message) + ui.print_message(formatted_msg) + +def main_loop(screen): + """Main application loop""" + + # Setup signal handler for Ctrl+C + signal.signal(signal.SIGINT, signal_handler) + + # Parse arguments + parser = argparse.ArgumentParser(description='Verbose QuakeLive server statistics') + parser.add_argument('--host', default=DEFAULT_HOST, help=f'ZMQ URI to connect to. Defaults to {DEFAULT_HOST}') + parser.add_argument('--password', required=False, help='RCON password') + parser.add_argument('--identity', default=uuid.uuid1().hex, help='Socket identity (random UUID by default)') + parser.add_argument('-v', '--verbose', action='count', default=0, help='Increase verbosity (-v INFO, -vv DEBUG)') + parser.add_argument('--unknown-log', default='unknown_events.log', help='File to log unknown JSON events') + parser.add_argument('-j', '--json', dest='json_log', default=None, help='File to log all JSON events') + args = parser.parse_args() + + # Set logging level + if args.verbose == 0: + logger.setLevel(logging.WARNING) + elif args.verbose == 1: + logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.DEBUG) + + # Setup file logging for unknown events + unknown_handler = logging.FileHandler(args.unknown_log, mode='a') + unknown_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S') + unknown_handler.setFormatter(unknown_formatter) + unknown_json_logger.addHandler(unknown_handler) + unknown_json_logger.propagate = False + + # Initialize components + ui = UIManager(screen, args.host) + game_state = GameState() + + # Setup logging to output window + log_handler = ui.setup_logging() + logger.addHandler(log_handler) + + # Setup input queue + input_queue = ui.setup_input_queue() + + # Display startup messages + ui.print_message(f"*** QL pyCon Version {VERSION} starting ***\n") + ui.print_message(f"zmq python bindings {zmq.__version__}, libzmq version {zmq.zmq_version()}\n") + + # Initialize network connections + rcon = RconConnection(args.host, args.password, args.identity) + rcon.connect() + + stats_conn = None + stats_port = None + stats_password = None + stats_check_counter = 0 + + # Shutdown flag + shutdown = False + + # Setup JSON logging if requested + json_logger = None + if args.json_log: + json_handler = logging.FileHandler(args.json_log, mode='a') + json_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S') + json_handler.setFormatter(json_formatter) + all_json_logger.addHandler(json_handler) + all_json_logger.propagate = False + json_logger = all_json_logger + + # Create event parser + event_parser = EventParser(game_state, json_logger, unknown_json_logger) + + # Main event loop + while not shutdown: + # Poll RCON socket + event = rcon.poll(POLL_TIMEOUT) + + # Check monitor for connection events + monitor_event = rcon.check_monitor() + if monitor_event and monitor_event[0] == zmq.EVENT_CONNECTED: + ui.print_message("Connected to server\n") + rcon.send_command(b'register') + logger.info('Registration message sent') + + ui.print_message("Requesting connection info...\n") + rcon.send_command(b'zmq_stats_password') + rcon.send_command(b'net_port') + + # Handle user input + while not input_queue.empty(): + command = input_queue.get() + logger.info(f'Sending command: {repr(command.strip())}') + + # Display command with timestamp + timestamp = time.strftime('%H:%M:%S') + ui.print_message(f"^5[^7{timestamp}^5] >>> {command.strip()}^7\n") + + rcon.send_command(command) + + # Poll stats stream if connected + if stats_conn and stats_conn.connected: + stats_check_counter += 1 + + if stats_check_counter % 100 == 0: + logger.debug(f'Stats polling active (check #{stats_check_counter // 100})') + + stats_msg = stats_conn.recv_message() + if stats_msg: + logger.info(f'Stats event received ({len(stats_msg)} bytes)') + + # Parse game event + parsed = event_parser.parse_event(stats_msg) + if parsed: + # Format with timestamp before displaying + formatted_msg, attributes = format_message(parsed) + ui.print_message(formatted_msg) + ui.update_server_info(game_state) + + # Process RCON messages + if event > 0: + logger.debug('Socket has data available') + msg_count = 0 + + while True: + message = rcon.recv_message() + if message is None: + if msg_count > 0: + logger.debug(f'Read {msg_count} message(s)') + break + + msg_count += 1 + + if len(message) == 0: + logger.debug('Received empty message (keepalive)') + continue + + logger.debug(f'Received message ({len(message)} bytes): {repr(message[:100])}') + + # Try to parse as cvar response + if parse_cvar_response(message, game_state, ui): + logger.debug('Suppressed cvar response') + continue + + # Check for player connect/disconnect/rename events + parse_player_events(message, game_state, ui) + + # Check for stats connection info + port, password = handle_stats_connection(message, rcon, ui, game_state) + if port: + stats_port = port + if password: + stats_password = password + + # Connect to stats if we have both credentials + if stats_port and stats_password and stats_conn is None: + try: + ui.print_message("Connecting to stats stream...\n") + host_ip = args.host.split('//')[1].split(':')[0] + + stats_conn = StatsConnection(host_ip, stats_port, stats_password) + stats_conn.connect() + + ui.print_message("Stats stream connected - ready for game events\n") + + # Request initial server info + logger.info('Sending initial server info queries') + rcon.send_command(b'qlx_serverBrandName') + rcon.send_command(b'g_factoryTitle') + rcon.send_command(b'mapname') + rcon.send_command(b'timelimit') + rcon.send_command(b'fraglimit') + rcon.send_command(b'roundlimit') + rcon.send_command(b'capturelimit') + rcon.send_command(b'sv_maxclients') + + if args.json_log: + ui.print_message(f"*** JSON capture enabled: {args.json_log} ***\n") + + except Exception as e: + timestamp = time.strftime('%H:%M:%S') + ui.print_message(f"^1[^7{timestamp}^1] Error: Stats connection failed: {e}^7\n") + logger.error(f'Stats connection failed: {e}') + + # Try to parse as game event + parsed_event = event_parser.parse_event(message) + if parsed_event: + ui.print_message(parsed_event) + continue + + # Check if it looks like JSON but wasn't parsed + stripped = message.strip() + if stripped and stripped[0] in ('{', '['): + logger.debug('Unparsed JSON event') + continue + + # Try powerup message formatting + powerup_msg = format_powerup_message(message, game_state.player_tracker) + if powerup_msg: + ui.print_message(powerup_msg) + continue + + # Filter bot debug messages in default mode + is_bot_debug = (' entered ' in message and + any(x in message for x in [' seek ', ' battle ', ' chase', ' fight'])) + if is_bot_debug and args.verbose == 0: + logger.debug(f'Filtered bot debug: {message[:50]}') + continue + + # Check if it's a chat message + if ':' in message and not message.startswith(('print', 'broadcast', 'zmq')): + message = format_chat_message(message, game_state.player_tracker) + + # Format and display message + formatted_msg, attributes = format_message(message) + ui.print_message(formatted_msg) + +if __name__ == '__main__': + curses.wrapper(main_loop) diff --git a/network.py b/network.py new file mode 100644 index 0000000..15261e4 --- /dev/null +++ b/network.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +""" +ZMQ network layer for QLPyCon +Handles RCON and stats stream connections +""" + +import zmq +import struct +import logging +import time + +logger = logging.getLogger('network') + + +def read_socket_event(msg): + """Parse ZMQ socket monitor event""" + event_id = struct.unpack(' 0 else 0 + accuracies[weapon] = accuracy + return accuracies + + +class EventParser: + """Parses JSON game events into formatted messages""" + + def __init__(self, game_state, json_logger=None, unknown_logger=None): + self.game_state = game_state + self.json_logger = json_logger + self.unknown_logger = unknown_logger + + def parse_event(self, message): + """ + Parse JSON event and return formatted message string + Returns None if event should not be displayed + """ + try: + event = json.loads(message) + + # Log all JSON if logger is configured + if self.json_logger: + self.json_logger.info('JSON Event received:') + self.json_logger.info(json.dumps(event, indent=2)) + self.json_logger.info('---') + + if 'TYPE' not in event or 'DATA' not in event: + logger.debug('JSON missing TYPE or DATA') + return None + + event_type = event['TYPE'] + data = event['DATA'] + + # Route to appropriate handler + handler_map = { + 'PLAYER_SWITCHTEAM': self._handle_switchteam, + 'PLAYER_DEATH': self._handle_death, + 'PLAYER_KILL': self._handle_death, # Same handler + 'PLAYER_MEDAL': self._handle_medal, + 'MATCH_STARTED': self._handle_match_started, + 'MATCH_REPORT': self._handle_match_report, + 'PLAYER_STATS': self._handle_player_stats, + 'PLAYER_CONNECT': lambda d: None, + 'PLAYER_DISCONNECT': lambda d: None, + 'ROUND_OVER': lambda d: None, + } + + handler = handler_map.get(event_type) + if handler: + return handler(data) + else: + # Unknown event + logger.debug(f'Unknown event type: {event_type}') + if self.unknown_logger: + self.unknown_logger.info(f'Unknown event type: {event_type}') + self.unknown_logger.info(f'Full JSON: {json.dumps(event, indent=2)}') + return None + + except json.JSONDecodeError as e: + logger.debug(f'JSON decode error: {e}') + return None + except (KeyError, TypeError) as e: + logger.debug(f'Error parsing event: {e}') + return None + + def _handle_switchteam(self, data): + """Handle PLAYER_SWITCHTEAM event""" + if 'KILLER' not in data: + return None + + killer = data['KILLER'] + name = killer.get('NAME', 'Unknown') + team = killer.get('TEAM', '') + old_team = killer.get('OLD_TEAM', '') + + # Update player team + self.game_state.player_tracker.update_team(name, team) + self.game_state.player_tracker.add_player(name) + + if team == old_team: + return None + + warmup = " ^3(warmup)" if data.get('WARMUP', False) else "" + + team_messages = { + 'FREE': ' ^7joined the ^2Fight^7', + 'SPECTATOR': ' ^7joined the ^3Spectators^7', + 'RED': ' ^7joined the ^1Red ^7team', + 'BLUE': ' ^7joined the ^4Blue ^7team' + } + + old_team_messages = { + 'FREE': 'the ^2Fight^7', + 'SPECTATOR': 'the ^3Spectators^7', + 'RED': '^7the ^1Red ^7team', + 'BLUE': '^7the ^4Blue ^7team' + } + + team_msg = team_messages.get(team, f' ^7joined team {team}^7') + old_team_msg = old_team_messages.get(old_team, f'team {old_team}') + + team_prefix = get_team_prefix(name, self.game_state.player_tracker) + return f"{team_prefix}{name}{team_msg} from {old_team_msg}{warmup}\n" + + def _handle_death(self, data): + """Handle PLAYER_DEATH and PLAYER_KILL events""" + if 'VICTIM' not in data: + return None + + victim = data['VICTIM'] + victim_name = victim.get('NAME', 'Unknown') + + # Check for duplicate + time_val = data.get('TIME', 0) + killer_name = data.get('KILLER', {}).get('NAME', '') if data.get('KILLER') else '' + + if self.game_state.event_deduplicator.is_duplicate('PLAYER_DEATH', time_val, killer_name, victim_name): + return None + + # Update victim team + if 'TEAM' in victim: + self.game_state.player_tracker.update_team(victim_name, victim['TEAM']) + self.game_state.player_tracker.add_player(victim_name) + + victim_prefix = get_team_prefix(victim_name, self.game_state.player_tracker) + warmup = " ^3(warmup)" if data.get('WARMUP', False) else "" + + # Environmental death (no killer) + if 'KILLER' not in data or not data['KILLER']: + mod = data.get('MOD', 'UNKNOWN') + msg_template = DEATH_MESSAGES.get(mod, "%s%s ^1DIED FROM %s^7") + + if mod in DEATH_MESSAGES: + msg = msg_template % (victim_prefix, victim_name) + else: + msg = msg_template % (victim_prefix, victim_name, mod) + + return f"{msg}{warmup}\n" + + # Player killed by another player + killer = data['KILLER'] + killer_name = killer.get('NAME', 'Unknown') + + # Update killer team + if 'TEAM' in killer: + self.game_state.player_tracker.update_team(killer_name, killer['TEAM']) + self.game_state.player_tracker.add_player(killer_name) + + killer_prefix = get_team_prefix(killer_name, self.game_state.player_tracker) + + # Suicide + if killer_name == victim_name: + weapon = killer.get('WEAPON', 'OTHER_WEAPON') + if weapon != 'OTHER_WEAPON': + weapon_name = WEAPON_NAMES.get(weapon, weapon) + return f"{killer_prefix}{killer_name} ^7committed suicide with the ^7{weapon_name}{warmup}\n" + return None + + # Regular kill + weapon = killer.get('WEAPON', 'UNKNOWN') + weapon_name = WEAPON_KILL_NAMES.get(weapon, f'the {weapon}') + + return f"{killer_prefix}{killer_name} ^7fragged^7 {victim_prefix}{victim_name} ^7with {weapon_name}{warmup}\n" + + def _handle_medal(self, data): + """Handle PLAYER_MEDAL event""" + name = data.get('NAME', 'Unknown') + medal = data.get('MEDAL', 'UNKNOWN') + warmup = " ^3(warmup)^7" if data.get('WARMUP', False) else "" + + team_prefix = get_team_prefix(name, self.game_state.player_tracker) + return f"{team_prefix}{name} ^7got a medal: ^6{medal}{warmup}\n" + + def _handle_match_started(self, data): + """Handle MATCH_STARTED event""" + if self.game_state.server_info.is_team_mode(): + return None + + players = [] + for player in data.get('PLAYERS', []): + name = player.get('NAME', 'Unknown') + players.append(name) + + if players: + formatted = " vs. ".join(players) + return f"Match has started - {formatted}\n" + + return None + + def _handle_match_report(self, data): + """Handle MATCH_REPORT event""" + if not self.game_state.server_info.is_team_mode(): + return None + + red_score = int(data.get('TSCORE0', '0')) + blue_score = int(data.get('TSCORE1', '0')) + + if red_score > blue_score: + return f"^1RED TEAM ^7WINS by a score of {red_score} to {blue_score}\n" + elif blue_score > red_score: + return f"^4BLUE TEAM ^7WINS by a score of {blue_score} to {red_score}\n" + else: + return f"^7The match is a TIE with a score of {red_score} to {blue_score}\n" + + def _handle_player_stats(self, data): + """Handle PLAYER_STATS event""" + name = data.get('NAME', 'Unknown') + team_prefix = get_team_prefix(name, self.game_state.player_tracker) + + kills = int(data.get('KILLS', '0')) + deaths = int(data.get('DEATHS', '0')) + + weapon_data = data.get('WEAPONS', {}) + accuracies = calculate_weapon_accuracies(weapon_data) + + if not accuracies: + return None + + best_weapon = max(accuracies, key=accuracies.get) + best_accuracy = accuracies[best_weapon] * 100 + weapon_stats = weapon_data.get(best_weapon, {}) + best_weapon_kills = int(weapon_stats.get('K', 0)) + + weapon_name = WEAPON_NAMES.get(best_weapon, best_weapon) + + return f"^7{team_prefix}{name} K/D: {kills}/{deaths} | Best Weapon: {weapon_name} - Acc: {best_accuracy:.2f}% - Kills: {best_weapon_kills}\n" diff --git a/state.py b/state.py new file mode 100644 index 0000000..d442da3 --- /dev/null +++ b/state.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python3 +""" +Game state management for QLPyCon +Tracks server info, players, and teams +""" + +import re +import logging +from config import TEAM_MODES, TEAM_MAP, MAX_RECENT_EVENTS + +logger = logging.getLogger('state') + + +class ServerInfo: + """Tracks current server information""" + + def __init__(self): + self.hostname = 'Unknown' + self.map = 'Unknown' + self.gametype = 'Unknown' + self.timelimit = '0' + self.fraglimit = '0' + self.roundlimit = '0' + self.capturelimit = '0' + self.maxclients = '0' + self.curclients = '0' + self.red_score = 0 + self.blue_score = 0 + self.players = [] + self.last_update = 0 + + def is_team_mode(self): + """Check if current gametype is a team mode""" + return self.gametype in TEAM_MODES + + def update_from_cvar(self, cvar_name, value): + """Update server info from a cvar response""" + # Normalize cvar name to lowercase for case-insensitive matching + cvar_lower = cvar_name.lower() + + mapping = { + 'qlx_serverbrandname': 'hostname', + 'g_factorytitle': 'gametype', + 'mapname': 'map', + 'timelimit': 'timelimit', + 'fraglimit': 'fraglimit', + 'roundlimit': 'roundlimit', + 'capturelimit': 'capturelimit', + 'sv_maxclients': 'maxclients' + } + + attr = mapping.get(cvar_lower) + if attr: + # Only strip color codes for non-hostname fields + if attr != 'hostname': + value = re.sub(r'\^\d', '', value) + setattr(self, attr, value) + logger.info(f'Updated {attr}: {value}') + return True + return False + + +class PlayerTracker: + """Tracks player teams and information""" + + def __init__(self, server_info): + self.server_info = server_info + self.player_teams = {} + + def update_team(self, name, team): + """Update player team. Team can be int or string""" + if not self.server_info.is_team_mode(): + return + + # Convert numeric team to string + if isinstance(team, int): + team = TEAM_MAP.get(team, 'FREE') + + if team not in ['RED', 'BLUE', 'FREE', 'SPECTATOR']: + team = 'FREE' + + # Store both original name and color-stripped version + self.player_teams[name] = team + clean_name = re.sub(r'\^\d', '', name) + if clean_name != name: + self.player_teams[clean_name] = team + + logger.debug(f'Updated team for {name} (clean: {clean_name}): {team}') + + def get_team(self, name): + """Get player's team""" + return self.player_teams.get(name) + + def add_player(self, name, score='0', ping='0'): + """Add player to server's player list if not exists""" + if not any(p['name'] == name for p in self.server_info.players): + self.server_info.players.append({ + 'name': name, + 'score': score, + 'ping': ping + }) + + def get_players_by_team(self): + """Get players organized by team""" + teams = {'RED': [], 'BLUE': [], 'SPECTATOR': [], 'FREE': []} + for player in self.server_info.players: + name = player['name'] + team = self.player_teams.get(name, 'FREE') + if team not in teams: + team = 'FREE' + teams[team].append(name) + return teams + + def remove_player(self, name): + """Remove player from tracking""" + clean_name = re.sub(r'\^\d', '', name) + + # Remove from player list (check both name and clean name) + self.server_info.players = [ + p for p in self.server_info.players + if p['name'] != name and re.sub(r'\^\d', '', p['name']) != clean_name + ] + + # Remove from team tracking + if name in self.player_teams: + del self.player_teams[name] + if clean_name in self.player_teams and clean_name != name: + del self.player_teams[clean_name] + + logger.debug(f'Removed player: {name} (clean: {clean_name})') + + def rename_player(self, old_name, new_name): + """Rename a player while maintaining their team""" + # Get current team + team = self.player_teams.get(old_name, 'SPECTATOR') + + # Remove old entries + self.remove_player(old_name) + + # Add with new name and team + if self.server_info.is_team_mode(): + self.update_team(new_name, team) + self.add_player(new_name) + + logger.debug(f'Renamed player: {old_name} -> {new_name}') + +class EventDeduplicator: + """Prevents duplicate kill/death events""" + + def __init__(self): + self.recent_events = [] + + def is_duplicate(self, event_type, time_val, killer_name, victim_name): + """Check if this kill/death event is a duplicate""" + if event_type not in ('PLAYER_DEATH', 'PLAYER_KILL'): + return False + + signature = f"KILL:{time_val}:{killer_name}:{victim_name}" + + if signature in self.recent_events: + logger.debug(f'Duplicate event: {signature}') + return True + + # Add to recent events + self.recent_events.append(signature) + if len(self.recent_events) > MAX_RECENT_EVENTS: + self.recent_events.pop(0) + + return False + + +class GameState: + """Main game state container""" + + def __init__(self): + self.server_info = ServerInfo() + self.player_tracker = PlayerTracker(self.server_info) + self.event_deduplicator = EventDeduplicator() + self.pending_background_commands = set() + self.status_buffer = [] diff --git a/ui.py b/ui.py new file mode 100644 index 0000000..bfed193 --- /dev/null +++ b/ui.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python3 +""" +Curses-based UI for QLPyCon +Handles terminal display, windows, and color rendering +""" + +import curses +import curses.textpad +import threading +import queue +import logging +from config import COLOR_PAIRS, INFO_WINDOW_HEIGHT, INFO_WINDOW_Y, OUTPUT_WINDOW_Y, INPUT_WINDOW_HEIGHT, TEAM_MODES + +logger = logging.getLogger('ui') + + +class CursesHandler(logging.Handler): + """Logging handler that outputs to curses window""" + + def __init__(self, window): + logging.Handler.__init__(self) + self.window = window + + def emit(self, record): + try: + msg = self.format(record) + fs = "%s\n" + try: + print_colored(self.window, fs % msg, 0) + self.window.refresh() + except UnicodeError: + print_colored(self.window, fs % msg.encode("UTF-8"), 0) + self.window.refresh() + except (KeyboardInterrupt, SystemExit): + raise + except: + self.handleError(record) + + +def print_colored(window, message, attributes=0): + """ + Print message with Quake color codes (^N) + ^0 = black, ^1 = red, ^2 = green, ^3 = yellow, ^4 = blue, ^5 = cyan, ^6 = magenta, ^7 = white + """ + if not curses.has_colors: + window.addstr(message) + return + + color = 0 + parse_color = False + + for ch in message: + val = ord(ch) + if parse_color: + if ord('0') <= val <= ord('7'): + color = val - ord('0') + if color == 7: + color = 0 + else: + window.addch('^', curses.color_pair(color) | attributes) + window.addch(ch, curses.color_pair(color) | attributes) + parse_color = False + elif ch == '^': + parse_color = True + else: + window.addch(ch, curses.color_pair(color) | attributes) + + window.refresh() + + +class UIManager: + """Manages curses windows and display""" + + def __init__(self, screen, host): + self.screen = screen + self.host = host + self.info_window = None + self.output_window = None + self.input_window = None + self.divider_window = None + self.input_queue = None + + self._init_curses() + self._create_windows() + + def _init_curses(self): + """Initialize curses settings""" + curses.endwin() + curses.initscr() + self.screen.nodelay(1) + curses.start_color() + curses.use_default_colors() + curses.cbreak() + curses.setsyx(-1, -1) + + self.screen.addstr(f"Quake Live PyCon: {self.host}") + self.screen.refresh() + + # Initialize color pairs + for i in range(1, 7): + curses.init_pair(i, i, 0) + + # Swap cyan and magenta (5 and 6) + curses.init_pair(5, 6, 0) + curses.init_pair(6, 5, 0) + + def _create_windows(self): + """Create all UI windows""" + maxy, maxx = self.screen.getmaxyx() + + # Server info window (top) + self.info_window = curses.newwin( + INFO_WINDOW_HEIGHT, + maxx - 4, + INFO_WINDOW_Y, + 2 + ) + self.info_window.scrollok(False) + self.info_window.idlok(False) + self.info_window.leaveok(True) + self.info_window.refresh() + + # Output window (middle - main display) + self.output_window = curses.newwin( + maxy - 15, + maxx - 4, + OUTPUT_WINDOW_Y, + 2 + ) + self.output_window.scrollok(True) + self.output_window.idlok(True) + self.output_window.leaveok(True) + self.output_window.refresh() + + # Input window (bottom) + self.input_window = curses.newwin( + INPUT_WINDOW_HEIGHT, + maxx - 6, + maxy - 2, + 4 + ) + self.screen.addstr(maxy - 2, 2, '$ ') + self.input_window.idlok(True) + self.input_window.leaveok(False) + self.input_window.refresh() + + # Divider line + self.divider_window = curses.newwin( + 1, + maxx - 4, + maxy - 3, + 2 + ) + self.divider_window.hline(curses.ACS_HLINE, maxx - 4) + self.divider_window.refresh() + + self.screen.refresh() + + def setup_input_queue(self): + """Setup threaded input queue""" + def wait_stdin(q, window): + while True: + line = curses.textpad.Textbox(window).edit() + if len(line) > 0: + q.put(line) + window.clear() + window.refresh() + + self.input_queue = queue.Queue() + t = threading.Thread(target=wait_stdin, args=(self.input_queue, self.input_window)) + t.daemon = True + t.start() + + return self.input_queue + + def setup_logging(self): + """Setup logging handler for output window""" + handler = CursesHandler(self.output_window) + formatter = logging.Formatter('%(asctime)-8s|%(name)-12s|%(levelname)-6s|%(message)-s', '%H:%M:%S') + handler.setFormatter(formatter) + return handler + + def print_message(self, message, attributes=0): + """Print formatted message to output window""" + y, x = curses.getsyx() + print_colored(self.output_window, message, attributes) + curses.setsyx(y, x) + curses.doupdate() + + def update_server_info(self, game_state): + """Update server info window""" + self.info_window.clear() + + max_y, max_x = self.info_window.getmaxyx() + server_info = game_state.server_info + + # Line 1: Hostname + hostname = server_info.hostname + print_colored(self.info_window, f"^6═══ {hostname} ^6═══^7\n", 0) + + # Line 2: Game info + gametype = server_info.gametype + mapname = server_info.map + timelimit = server_info.timelimit + fraglimit = server_info.fraglimit + roundlimit = server_info.roundlimit + caplimit = server_info.capturelimit + curclients = server_info.curclients + maxclients = server_info.maxclients + + print_colored(self.info_window, + f"^3Type:^7 {gametype} ^3Map:^7 {mapname} ^3Players:^7 {curclients}/{maxclients} " + f"^3Limits (T/F/R/C):^7 {timelimit}/{fraglimit}/{roundlimit}/{caplimit}\n", 0) + + # Line 3: Team headers and player lists + teams = game_state.player_tracker.get_players_by_team() + + if server_info.gametype in TEAM_MODES: + print_colored(self.info_window, f"^1(RED) ^4(BLUE) ^3(SPEC)\n", 0) + + red_players = teams['RED'][:4] + blue_players = teams['BLUE'][:4] + spec_players = teams['SPECTATOR'][:4] + + for i in range(4): + red = red_players[i] if i < len(red_players) else '' + blue = blue_players[i] if i < len(blue_players) else '' + spec = spec_players[i] if i < len(spec_players) else '' + + # Strip color codes for padding calculation + from formatter import strip_color_codes + red_clean = strip_color_codes(red) + blue_clean = strip_color_codes(blue) + spec_clean = strip_color_codes(spec) + + # Calculate padding (24 chars per column) + red_pad = 24 - len(red_clean) + blue_pad = 24 - len(blue_clean) + + line = f"{red}{' ' * red_pad}{blue}{' ' * blue_pad}{spec}\n" + print_colored(self.info_window, line, 0) + else: + print_colored(self.info_window, f"^3(FREE)\n", 0) + free_players = teams['FREE'][:4] + for player in free_players: + print_colored(self.info_window, f"{player}\n", 0) + # Fill remaining lines + for i in range(4 - len(free_players)): + self.info_window.addstr("\n") + + # Blank lines to fill + self.info_window.addstr("\n\n") + + # Separator + separator = "^6" + "═" * (max_x - 1) + "^7" + print_colored(self.info_window, separator, 0) + + self.info_window.refresh()