diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cf8e9aa --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +venv3/ +*.log +__pycache__/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..caae500 --- /dev/null +++ b/README.md @@ -0,0 +1,144 @@ +# QLPyCon - Quake Live Python Console + +A modular, refactored terminal-based client for monitoring Quake Live servers via ZMQ. + +## Version 0.8.0 - Modular Architecture + +### Features + +- **Real-time game monitoring** - Watch kills, deaths, team switches, medals, and more +- **Server info display** - Shows hostname, map, gametype, limits, and player count +- **Team-aware chat** - Color-coded messages with team prefixes +- **Powerup tracking** - Formatted pickup and carrier kill messages +- **JSON event logging** - Capture all game events to file for analysis +- **Colorized output** - Quake color code support (^0-^7) + +### Module Structure + +``` +qlpycon/ +├── __init__.py # Package initialization +├── main.py # Entry point and main loop +├── config.py # Constants and configuration +├── state.py # Game state management (ServerInfo, PlayerTracker, etc) +├── network.py # ZMQ connections (RCON and stats stream) +├── parser.py # JSON event parsing +├── formatter.py # Message formatting and colorization +└── ui.py # Curses interface (windows, display, input) +``` + +### Installation + +```bash +# Requirements +pip install pyzmq + +# Run directly +python -m qlpycon.main --host tcp://127.0.0.1:27961 --password YOUR_PASSWORD +``` + +### Usage + +```bash +# Basic connection +python main.py --host tcp://SERVER_IP:PORT --password RCON_PASSWORD + +# Verbose mode (show all communications) +python main.py --host tcp://SERVER_IP:PORT --password PASS -v + +# Debug mode (detailed logging) +python main.py --host tcp://SERVER_IP:PORT --password PASS -vv + +# Capture all JSON events to file +python main.py --host tcp://SERVER_IP:PORT --password PASS --json events.log + +# Custom unknown events log +python main.py --unknown-log my_unknown.log +``` + +### Command Line Options + +- `--host` - ZMQ URI (default: tcp://127.0.0.1:27961) +- `--password` - RCON password +- `--identity` - Socket identity (random UUID by default) +- `-v, --verbose` - Increase verbosity (use -v for INFO, -vv for DEBUG) +- `--json FILE` - Log all JSON events to FILE +- `--unknown-log FILE` - Log unknown JSON events (default: unknown_events.log) + +### Architecture Overview + +#### State Management (`state.py`) +- **ServerInfo** - Tracks server configuration and metadata +- **PlayerTracker** - Manages player teams and information +- **EventDeduplicator** - Prevents duplicate kill/death events +- **GameState** - Main container for all state + +#### Network Layer (`network.py`) +- **RconConnection** - Handles DEALER socket for RCON commands +- **StatsConnection** - Handles SUB socket for game event stream + +#### Event Parsing (`parser.py`) +- **EventParser** - Parses JSON game events into formatted messages +- Modular handlers for each event type (deaths, medals, team switches, etc) + +#### Message Formatting (`formatter.py`) +- Color code handling (Quake's ^N system) +- Team prefix generation +- Timestamp logic +- Chat message formatting + +#### UI Layer (`ui.py`) +- **UIManager** - Manages all curses windows +- Three-panel layout: server info, output, input +- Threaded input queue for non-blocking commands +- Color rendering with curses + +### Key Improvements Over v0.7.0 + +1. **Modular Design** - Separated concerns into focused modules +2. **Clean Classes** - OOP design for state and connections +3. **Better Maintainability** - Each module has a single responsibility +4. **Easier Testing** - Components can be tested independently +5. **Clear Interfaces** - Well-defined APIs between modules +6. **Improved Comments** - Every module, class, and function documented + +### Event Types Supported + +- `PLAYER_SWITCHTEAM` - Team changes +- `PLAYER_DEATH` / `PLAYER_KILL` - Frag events (deduplicated) +- `PLAYER_MEDAL` - Medal awards +- `PLAYER_STATS` - End-game statistics with weapon accuracy +- `MATCH_STARTED` - Match initialization +- `MATCH_REPORT` - Final scores +- `PLAYER_CONNECT` / `PLAYER_DISCONNECT` - Connection events +- `ROUND_OVER` - Round completion + +### Color Codes + +Quake Live uses `^N` color codes where N is 0-7: +- `^0` - Black +- `^1` - Red +- `^2` - Green +- `^3` - Yellow +- `^4` - Blue +- `^5` - Cyan +- `^6` - Magenta +- `^7` - White (default) + +### Development + +To extend QLPyCon: + +1. **Add new event types** - Edit `parser.py`, add handler method +2. **Change UI layout** - Edit `ui.py`, modify window creation +3. **Add new commands** - Edit `main.py`, handle in main loop +4. **Modify formatting** - Edit `formatter.py`, adjust color/timestamp logic +5. **Add configuration** - Edit `config.py`, add constants + +### License + +MIT License - See original qlpycon.py for details + +### Credits + +Refactored from qlpycon.py v0.7.0 diff --git a/config.py b/config.py new file mode 100644 index 0000000..2a2ab4b --- /dev/null +++ b/config.py @@ -0,0 +1,101 @@ +#!/usr/bin/env python3 +""" +Configuration and constants for QLPyCon +""" + +VERSION = "0.8.0" + +# Network defaults +DEFAULT_HOST = 'tcp://127.0.0.1:27961' +POLL_TIMEOUT = 100 + +# UI dimensions +INFO_WINDOW_HEIGHT = 10 +INFO_WINDOW_Y = 2 +OUTPUT_WINDOW_Y = 12 +INPUT_WINDOW_HEIGHT = 1 + +# Event deduplication +MAX_RECENT_EVENTS = 10 + +# Team game modes +TEAM_MODES = [ + "Team Deathmatch", + "Clan Arena", + "Capture The Flag", + "One Flag CTF", + "Overload", + "Harvester", + "Freeze Tag" +] + +# Team mappings +TEAM_MAP = { + 0: 'FREE', + 1: 'RED', + 2: 'BLUE', + 3: 'SPECTATOR' +} + +TEAM_COLORS = { + 'RED': '^1(RED)^7', + 'BLUE': '^4(BLUE)^7', + 'FREE': '', + 'SPECTATOR': '^3(SPEC)^7' +} + +# Weapon names +WEAPON_NAMES = { + 'ROCKET': 'Rocket Launcher', + 'LIGHTNING': 'Lightning Gun', + 'RAILGUN': 'Railgun', + 'SHOTGUN': 'Shotgun', + 'GAUNTLET': 'Gauntlet', + 'GRENADE': 'Grenade Launcher', + 'PLASMA': 'Plasma Gun', + 'MACHINEGUN': 'Machine Gun' +} + +# Weapon names for kill messages +WEAPON_KILL_NAMES = { + 'ROCKET': 'the Rocket Launcher', + 'LIGHTNING': 'the Lightning Gun', + 'RAILGUN': 'the Railgun', + 'SHOTGUN': 'the Shotgun', + 'GAUNTLET': 'the Gauntlet', + 'GRENADE': 'the Grenade Launcher', + 'PLASMA': 'the Plasma Gun', + 'MACHINEGUN': 'the Machine Gun' +} + +# Death messages +DEATH_MESSAGES = { + 'FALLING': "%s%s ^7cratered.", + 'HURT': "%s%s ^7was in the wrong place.", + 'LAVA': "%s%s ^7does a backflip into the lava.", + 'WATER': "%s%s ^7sank like a rock.", + 'SLIME': "%s%s ^7melted.", + 'CRUSH': "%s%s ^7was crushed." +} + +# Powerup names and colors +POWERUP_COLORS = { + 'Quad Damage': '^5Quad Damage^7', + 'Battle Suit': '^3Battle Suit^7', + 'Regeneration': '^1Regeneration^7', + 'Haste': '^3Haste^7', + 'Invisibility': '^5Invisibility^7', + 'Flight': '^5Flight^7', + 'Medkit': '^1Medkit^7', + 'MegaHealth': '^4MegaHealth^7' +} + +# Curses color pairs +COLOR_PAIRS = { + 1: 1, # Red + 2: 2, # Green + 3: 3, # Yellow + 4: 4, # Blue + 5: 6, # Cyan (swapped) + 6: 5 # Magenta (swapped) +} diff --git a/formatter.py b/formatter.py new file mode 100644 index 0000000..37ed5ec --- /dev/null +++ b/formatter.py @@ -0,0 +1,202 @@ +#!/usr/bin/env python3 +""" +Message formatting and colorization for QLPyCon +Handles Quake color codes and team prefixes +""" + +import re +import time +from config import TEAM_COLORS + + +def strip_color_codes(text): + """Remove Quake color codes (^N) from text""" + return re.sub(r'\^\d', '', text) + + +def get_team_prefix(player_name, player_tracker): + """Get color-coded team prefix for a player""" + if not player_tracker.server_info.is_team_mode(): + return '' + + team = player_tracker.get_team(player_name) + if not team: + return '' + + return TEAM_COLORS.get(team, '') + + +def should_add_timestamp(message): + """Determine if a message should get a timestamp""" + # Skip status command output + skip_keywords = ['map:', 'num score', '---', 'bot', 'status'] + if any(kw in message for kw in skip_keywords): + # But allow "zmq RCON" lines (command echoes) + if 'zmq RCON' not in message: + return False + + # Skip very short messages or fragments + stripped = message.strip() + if len(stripped) <= 2: + return False + + # Skip messages with leading spaces (status fragments) + if message.startswith(' ') and len(stripped) < 50: + return False + + # Skip pure numbers + if stripped.isdigit(): + return False + + # Skip short single words + if len(stripped) < 20 and ' ' not in stripped: + return False + + # Skip IP addresses (xxx.xxx.xxx.xxx or xxx.xxx.xxx.xxx:port) + allowed_chars = set('0123456789.:') + if stripped.count('.') == 3 and all(c in allowed_chars for c in stripped): + return False + + # Skip messages starting with *** + if message.startswith('***'): + return False + + return True + + +def format_message(message, add_timestamp=True): + """ + Format a message for display + - Strips special characters + - Adds timestamp if appropriate + - Handles broadcast formatting + """ + # Clean up message + message = message.replace("\\n", "") + message = message.replace(chr(25), "") + + # Handle broadcast messages + attributes = 0 + if message[:10] == "broadcast:": + message = message[11:] + attributes = 1 # Bold + + # Handle print messages + if message[:7] == "print \"": + message = message[7:-2] + "\n" + + # Add timestamp if requested and appropriate + if add_timestamp and should_add_timestamp(message): + timestamp = time.strftime('%H:%M:%S') + message = f"^3[^7{timestamp}^3]^7 {message}" + + return message, attributes + + +def format_chat_message(message, player_tracker): + """ + Format chat messages with team prefixes and colors + Handles both regular chat (Name: msg) and team chat ((Name): msg or (Name) (Location): msg) + """ + # Strip special character + clean_msg = message.replace(chr(25), '') + + # Team chat with location: (PlayerName) (Location): message + # Location can have nested parens like (Lower Floor (Near Yellow Armour)) + if clean_msg.strip().startswith('(') and ')' in clean_msg: + # Extract player name (first parenthetical) + player_match = re.match(r'^(\([^)]+\))', clean_msg) + if not player_match: + return message + + player_part = player_match.group(1) + rest = clean_msg[len(player_part):].lstrip() + + # Check for location (another parenthetical) + if rest.startswith('('): + # Count parens to handle nesting + paren_count = 0 + location_end = -1 + for i, char in enumerate(rest): + if char == '(': + paren_count += 1 + elif char == ')': + paren_count -= 1 + if paren_count == 0: + location_end = i + 1 + break + + # Check if location ends with colon + if location_end > 0 and location_end < len(rest) and rest[location_end] == ':': + location_part = rest[:location_end] + message_part = rest[location_end + 1:] + + # Get team prefix + name_match = re.match(r'\(([^)]+)\)', player_part) + if name_match: + player_name = strip_color_codes(name_match.group(1).strip()) + team_prefix = get_team_prefix(player_name, player_tracker) + location_clean = strip_color_codes(location_part) + return f"{team_prefix}{player_part} ^3{location_clean}^7:^5{message_part}" + + # Team chat without location: (PlayerName): message + colon_match = re.match(r'^(\([^)]+\)):(\s*.*)', clean_msg) + if colon_match: + player_part = colon_match.group(1) + ':' + message_part = colon_match.group(2) + + name_match = re.match(r'\(([^)]+)\)', player_part) + if name_match: + player_name = strip_color_codes(name_match.group(1).strip()) + team_prefix = get_team_prefix(player_name, player_tracker) + return f"{team_prefix}{player_part}^5{message_part}\n" + + # Regular chat: PlayerName: message + parts = clean_msg.split(':', 1) + if len(parts) == 2: + player_name = strip_color_codes(parts[0].strip()) + team_prefix = get_team_prefix(player_name, player_tracker) + + # Preserve original color-coded name + original_parts = message.replace(chr(25), '').split(':', 1) + if len(original_parts) == 2: + return f"{team_prefix}{original_parts[0]}:^2{original_parts[1]}" + + return message + + +def format_powerup_message(message, player_tracker): + """ + Format powerup pickup and carrier kill messages + Returns formatted message or None if not a powerup message + """ + from config import POWERUP_COLORS + + if message.startswith("broadcast:"): + message = message[11:].strip() + + # Powerup pickup: "PlayerName got the PowerupName!" + pickup_match = re.match(r'^(.+?)\s+got the\s+(.+?)!', message) + if pickup_match: + player_name = pickup_match.group(1).strip() + powerup_name = pickup_match.group(2).strip() + + player_clean = strip_color_codes(player_name) + team_prefix = get_team_prefix(player_clean, player_tracker) + + colored_powerup = POWERUP_COLORS.get(powerup_name, f'^6{powerup_name}^7') + return f"{team_prefix}{player_name} ^7got the {colored_powerup}!\n" + + # Powerup carrier kill: "PlayerName killed the PowerupName carrier!" + carrier_match = re.match(r'^(.+?)\s+killed the\s+(.+?)\s+carrier!', message) + if carrier_match: + player_name = carrier_match.group(1).strip() + powerup_name = carrier_match.group(2).strip() + + player_clean = strip_color_codes(player_name) + team_prefix = get_team_prefix(player_clean, player_tracker) + + colored_powerup = POWERUP_COLORS.get(powerup_name, f'^6{powerup_name}^7') + return f"{team_prefix}{player_name} ^7killed the {colored_powerup} ^7carrier!\n" + + return None diff --git a/main.py b/main.py new file mode 100644 index 0000000..62b2025 --- /dev/null +++ b/main.py @@ -0,0 +1,589 @@ +#!/usr/bin/env python3 +""" +QLPyCon - Quake Live Python Console +Main entry point +""" + +import argparse +import uuid +import logging +import time +import re +import curses +import zmq +import signal +import sys + +from config import VERSION, DEFAULT_HOST, POLL_TIMEOUT +from state import GameState +from network import RconConnection, StatsConnection +from parser import EventParser +from formatter import format_message, format_chat_message, format_powerup_message +from ui import UIManager + +# Configure logging +logger = logging.getLogger('main') +logger.setLevel(logging.DEBUG) + +all_json_logger = logging.getLogger('all_json') +all_json_logger.setLevel(logging.DEBUG) + +unknown_json_logger = logging.getLogger('unknown_json') +unknown_json_logger.setLevel(logging.DEBUG) + + +def signal_handler(sig, frame): + """Handle Ctrl+C for immediate shutdown""" + # Don't call curses.endwin() here - let curses.wrapper handle it + sys.exit(0) + + +def parse_cvar_response(message, game_state, ui): + """ + Parse server cvar responses and update state + Returns True if message should be suppressed from display + """ + # Suppress command echo lines + if message.startswith('zmq RCON command') and 'from' in message: + suppress_cmds = ['status', 'roundlimit', 'qlx_serverBrandName', 'g_factoryTitle', + 'mapname', 'timelimit', 'fraglimit', 'capturelimit', 'sv_maxclients'] + if any(f': {cmd}' in message for cmd in suppress_cmds): + return True + + # Parse cvar responses (format: "cvar_name" is:"value" default:...) + cvar_match = re.search(r'"([^"]+)"\s+is:"([^"]*)"', message) + if cvar_match: + cvar_name = cvar_match.group(1) + value = cvar_match.group(2) + + if game_state.server_info.update_from_cvar(cvar_name, value): + ui.update_server_info(game_state) + return True + + return False + + +def handle_stats_connection(message, rcon, ui, game_state): + """ + Handle stats connection info extraction + Returns (stats_port, stats_password) or (None, None) + """ + stats_port = None + stats_password = None + + # Extract stats port + if 'net_port' in message and ' is:' in message and '"net_port"' in message: + match = re.search(r' is:"([^"]+)"', message) + if match: + port_str = match.group(1).strip() + digit_match = re.search(r'(\d+)', port_str) + if digit_match: + stats_port = digit_match.group(1) + logger.info(f'Got stats port: {stats_port}') + + # Extract stats password + if 'zmq_stats_password' in message and ' is:' in message and '"zmq_stats_password"' in message: + match = re.search(r' is:"([^"]+)"', message) + if match: + password_str = match.group(1) + password_str = re.sub(r'\^\d', '', password_str) # Strip color codes + stats_password = password_str.strip() + logger.info(f'Got stats password: {stats_password}') + + return stats_port, stats_password + +def parse_player_events(message, game_state, ui): + """ + Parse connect, disconnect, kick, and rename messages + Returns True if message should be suppressed + """ + try: + # Strip color codes for matching + from formatter import strip_color_codes + clean_msg = strip_color_codes(message) + + # Match connects: "NAME connected" + connect_match = re.match(r'^(.+?)\s+connected(?:\s+with Steam ID)?', clean_msg) + if connect_match: + player_name = message.split(' connected')[0].strip() # Keep color codes + if game_state.server_info.is_team_mode(): + game_state.player_tracker.update_team(player_name, 'SPECTATOR') + game_state.player_tracker.add_player(player_name) + ui.update_server_info(game_state) + logger.debug(f'Player connected: {player_name}') + return False + + # Match disconnects: "NAME disconnected", "NAME was kicked" + disconnect_patterns = [ + r'^(.+?)\s+disconnected', + r'^(.+?)\s+was kicked', + ] + + for pattern in disconnect_patterns: + match = re.match(pattern, clean_msg) + if match: + player_name_clean = match.group(1).strip() + # Try to find the original name with color codes + original_match = re.match(pattern, message) + player_name = original_match.group(1).strip() if original_match else player_name_clean + + game_state.player_tracker.remove_player(player_name) + game_state.player_tracker.remove_player(player_name_clean) # Try both + ui.update_server_info(game_state) + logger.debug(f'Player disconnected: {player_name}') + return False + + # Match renames: "OldName renamed to NewName" + rename_match = re.match(r'^(.+?)\s+renamed to\s+(.+?)$', clean_msg) + if rename_match: + old_name_clean = rename_match.group(1).strip() + new_name_clean = rename_match.group(2).strip() + + # Get names with color codes + original_match = re.match(r'^(.+?)\s+renamed to\s+(.+?)$', message) + old_name = original_match.group(1).strip() if original_match else old_name_clean + new_name = original_match.group(2).strip() if original_match else new_name_clean + + game_state.player_tracker.rename_player(old_name, new_name) + ui.update_server_info(game_state) + logger.debug(f'Player renamed: {old_name} -> {new_name}') + return False + + except Exception as e: + logger.error(f'Error in parse_player_events: {e}') + + return False + +def main_loop(screen): + return False + """Main application loop""" + + # Setup signal handler for Ctrl+C + signal.signal(signal.SIGINT, signal_handler) + + # Parse arguments + parser = argparse.ArgumentParser(description='Verbose QuakeLive server statistics') + parser.add_argument('--host', default=DEFAULT_HOST, help=f'ZMQ URI to connect to. Defaults to {DEFAULT_HOST}') + parser.add_argument('--password', required=False, help='RCON password') + parser.add_argument('--identity', default=uuid.uuid1().hex, help='Socket identity (random UUID by default)') + parser.add_argument('-v', '--verbose', action='count', default=0, help='Increase verbosity (-v INFO, -vv DEBUG)') + parser.add_argument('--unknown-log', default='unknown_events.log', help='File to log unknown JSON events') + parser.add_argument('-j', '--json', dest='json_log', default=None, help='File to log all JSON events') + args = parser.parse_args() + + # Set logging level + if args.verbose == 0: + logger.setLevel(logging.WARNING) + elif args.verbose == 1: + logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.DEBUG) + + # Setup file logging for unknown events + unknown_handler = logging.FileHandler(args.unknown_log, mode='a') + unknown_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S') + unknown_handler.setFormatter(unknown_formatter) + unknown_json_logger.addHandler(unknown_handler) + unknown_json_logger.propagate = False + + # Initialize components + ui = UIManager(screen, args.host) + game_state = GameState() + + # Setup logging to output window + log_handler = ui.setup_logging() + logger.addHandler(log_handler) + + # Setup input queue + input_queue = ui.setup_input_queue() + + # Display startup messages + ui.print_message(f"*** QL pyCon Version {VERSION} starting ***\n") + ui.print_message(f"zmq python bindings {zmq.__version__}, libzmq version {zmq.zmq_version()}\n") + + # Initialize network connections + rcon = RconConnection(args.host, args.password, args.identity) + rcon.connect() + + stats_conn = None + stats_port = None + stats_password = None + stats_check_counter = 0 + + # Shutdown flag + shutdown = False + + # Setup JSON logging if requested + json_logger = None + if args.json_log: + json_handler = logging.FileHandler(args.json_log, mode='a') + json_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S') + json_handler.setFormatter(json_formatter) + all_json_logger.addHandler(json_handler) + all_json_logger.propagate = False + json_logger = all_json_logger + + # Create event parser + event_parser = EventParser(game_state, json_logger, unknown_json_logger) + + # Main event loop + while not shutdown: + # Poll RCON socket + event = rcon.poll(POLL_TIMEOUT) + + # Check monitor for connection events + monitor_event = rcon.check_monitor() + if monitor_event and monitor_event[0] == zmq.EVENT_CONNECTED: + ui.print_message("Connected to server\n") + rcon.send_command(b'register') + logger.info('Registration message sent') + + ui.print_message("Requesting connection info...\n") + rcon.send_command(b'zmq_stats_password') + rcon.send_command(b'net_port') + + # Handle user input + while not input_queue.empty(): + command = input_queue.get() + logger.info(f'Sending command: {repr(command.strip())}') + + # Display command with timestamp + timestamp = time.strftime('%H:%M:%S') + ui.print_message(f"^5[^7{timestamp}^5] >>> {command.strip()}^7\n") + + rcon.send_command(command) + + # Poll stats stream if connected + if stats_conn and stats_conn.connected: + stats_check_counter += 1 + + if stats_check_counter % 100 == 0: + logger.debug(f'Stats polling active (check #{stats_check_counter // 100})') + + stats_msg = stats_conn.recv_message() + if stats_msg: + logger.info(f'Stats event received ({len(stats_msg)} bytes)') + + # Parse game event + parsed = event_parser.parse_event(stats_msg) + if parsed: + # Format with timestamp before displaying + formatted_msg, attributes = format_message(parsed) + ui.print_message(formatted_msg) + ui.update_server_info(game_state) + + # Process RCON messages + if event > 0: + logger.debug('Socket has data available') + msg_count = 0 + + while True: + message = rcon.recv_message() + if message is None: + if msg_count > 0: + logger.debug(f'Read {msg_count} message(s)') + break + + msg_count += 1 + + if len(message) == 0: + logger.debug('Received empty message (keepalive)') + continue + + logger.debug(f'Received message ({len(message)} bytes): {repr(message[:100])}') + + # Try to parse as cvar response + if parse_cvar_response(message, game_state, ui): + logger.debug('Suppressed cvar response') + continue + + # Check for player connect/disconnect/rename events + parse_player_events(message, game_state, ui) + + # Check for stats connection info + port, password = handle_stats_connection(message, rcon, ui, game_state) + if port: + stats_port = port + if password: + stats_password = password + + # Connect to stats if we have both credentials + if stats_port and stats_password and stats_conn is None: + try: + ui.print_message("Connecting to stats stream...\n") + host_ip = args.host.split('//')[1].split(':')[0] + + stats_conn = StatsConnection(host_ip, stats_port, stats_password) + stats_conn.connect() + + ui.print_message("Stats stream connected - ready for game events\n") + + # Request initial server info + logger.info('Sending initial server info queries') + rcon.send_command(b'qlx_serverBrandName') + rcon.send_command(b'g_factoryTitle') + rcon.send_command(b'mapname') + rcon.send_command(b'timelimit') + rcon.send_command(b'fraglimit') + rcon.send_command(b'roundlimit') + rcon.send_command(b'capturelimit') + rcon.send_command(b'sv_maxclients') + + if args.json_log: + ui.print_message(f"*** JSON capture enabled: {args.json_log} ***\n") + + except Exception as e: + timestamp = time.strftime('%H:%M:%S') + ui.print_message(f"^1[^7{timestamp}^1] Error: Stats connection failed: {e}^7\n") + logger.error(f'Stats connection failed: {e}') + + # Try to parse as game event + parsed_event = event_parser.parse_event(message) + if parsed_event: + ui.print_message(parsed_event) + continue + + # Check if it looks like JSON but wasn't parsed + stripped = message.strip() + if stripped and stripped[0] in ('{', '['): + logger.debug('Unparsed JSON event') + continue + + # Try powerup message formatting + powerup_msg = format_powerup_message(message, game_state.player_tracker) + if powerup_msg: + ui.print_message(powerup_msg) + continue + + # Filter bot debug messages in default mode + is_bot_debug = (' entered ' in message and + any(x in message for x in [' seek ', ' battle ', ' chase', ' fight'])) + if is_bot_debug and args.verbose == 0: + logger.debug(f'Filtered bot debug: {message[:50]}') + continue + + # Check if it's a chat message + if ':' in message and not message.startswith(('print', 'broadcast', 'zmq')): + message = format_chat_message(message, game_state.player_tracker) + + # Format and display message + formatted_msg, attributes = format_message(message) + ui.print_message(formatted_msg) + +def main_loop(screen): + """Main application loop""" + + # Setup signal handler for Ctrl+C + signal.signal(signal.SIGINT, signal_handler) + + # Parse arguments + parser = argparse.ArgumentParser(description='Verbose QuakeLive server statistics') + parser.add_argument('--host', default=DEFAULT_HOST, help=f'ZMQ URI to connect to. Defaults to {DEFAULT_HOST}') + parser.add_argument('--password', required=False, help='RCON password') + parser.add_argument('--identity', default=uuid.uuid1().hex, help='Socket identity (random UUID by default)') + parser.add_argument('-v', '--verbose', action='count', default=0, help='Increase verbosity (-v INFO, -vv DEBUG)') + parser.add_argument('--unknown-log', default='unknown_events.log', help='File to log unknown JSON events') + parser.add_argument('-j', '--json', dest='json_log', default=None, help='File to log all JSON events') + args = parser.parse_args() + + # Set logging level + if args.verbose == 0: + logger.setLevel(logging.WARNING) + elif args.verbose == 1: + logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.DEBUG) + + # Setup file logging for unknown events + unknown_handler = logging.FileHandler(args.unknown_log, mode='a') + unknown_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S') + unknown_handler.setFormatter(unknown_formatter) + unknown_json_logger.addHandler(unknown_handler) + unknown_json_logger.propagate = False + + # Initialize components + ui = UIManager(screen, args.host) + game_state = GameState() + + # Setup logging to output window + log_handler = ui.setup_logging() + logger.addHandler(log_handler) + + # Setup input queue + input_queue = ui.setup_input_queue() + + # Display startup messages + ui.print_message(f"*** QL pyCon Version {VERSION} starting ***\n") + ui.print_message(f"zmq python bindings {zmq.__version__}, libzmq version {zmq.zmq_version()}\n") + + # Initialize network connections + rcon = RconConnection(args.host, args.password, args.identity) + rcon.connect() + + stats_conn = None + stats_port = None + stats_password = None + stats_check_counter = 0 + + # Shutdown flag + shutdown = False + + # Setup JSON logging if requested + json_logger = None + if args.json_log: + json_handler = logging.FileHandler(args.json_log, mode='a') + json_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S') + json_handler.setFormatter(json_formatter) + all_json_logger.addHandler(json_handler) + all_json_logger.propagate = False + json_logger = all_json_logger + + # Create event parser + event_parser = EventParser(game_state, json_logger, unknown_json_logger) + + # Main event loop + while not shutdown: + # Poll RCON socket + event = rcon.poll(POLL_TIMEOUT) + + # Check monitor for connection events + monitor_event = rcon.check_monitor() + if monitor_event and monitor_event[0] == zmq.EVENT_CONNECTED: + ui.print_message("Connected to server\n") + rcon.send_command(b'register') + logger.info('Registration message sent') + + ui.print_message("Requesting connection info...\n") + rcon.send_command(b'zmq_stats_password') + rcon.send_command(b'net_port') + + # Handle user input + while not input_queue.empty(): + command = input_queue.get() + logger.info(f'Sending command: {repr(command.strip())}') + + # Display command with timestamp + timestamp = time.strftime('%H:%M:%S') + ui.print_message(f"^5[^7{timestamp}^5] >>> {command.strip()}^7\n") + + rcon.send_command(command) + + # Poll stats stream if connected + if stats_conn and stats_conn.connected: + stats_check_counter += 1 + + if stats_check_counter % 100 == 0: + logger.debug(f'Stats polling active (check #{stats_check_counter // 100})') + + stats_msg = stats_conn.recv_message() + if stats_msg: + logger.info(f'Stats event received ({len(stats_msg)} bytes)') + + # Parse game event + parsed = event_parser.parse_event(stats_msg) + if parsed: + # Format with timestamp before displaying + formatted_msg, attributes = format_message(parsed) + ui.print_message(formatted_msg) + ui.update_server_info(game_state) + + # Process RCON messages + if event > 0: + logger.debug('Socket has data available') + msg_count = 0 + + while True: + message = rcon.recv_message() + if message is None: + if msg_count > 0: + logger.debug(f'Read {msg_count} message(s)') + break + + msg_count += 1 + + if len(message) == 0: + logger.debug('Received empty message (keepalive)') + continue + + logger.debug(f'Received message ({len(message)} bytes): {repr(message[:100])}') + + # Try to parse as cvar response + if parse_cvar_response(message, game_state, ui): + logger.debug('Suppressed cvar response') + continue + + # Check for player connect/disconnect/rename events + parse_player_events(message, game_state, ui) + + # Check for stats connection info + port, password = handle_stats_connection(message, rcon, ui, game_state) + if port: + stats_port = port + if password: + stats_password = password + + # Connect to stats if we have both credentials + if stats_port and stats_password and stats_conn is None: + try: + ui.print_message("Connecting to stats stream...\n") + host_ip = args.host.split('//')[1].split(':')[0] + + stats_conn = StatsConnection(host_ip, stats_port, stats_password) + stats_conn.connect() + + ui.print_message("Stats stream connected - ready for game events\n") + + # Request initial server info + logger.info('Sending initial server info queries') + rcon.send_command(b'qlx_serverBrandName') + rcon.send_command(b'g_factoryTitle') + rcon.send_command(b'mapname') + rcon.send_command(b'timelimit') + rcon.send_command(b'fraglimit') + rcon.send_command(b'roundlimit') + rcon.send_command(b'capturelimit') + rcon.send_command(b'sv_maxclients') + + if args.json_log: + ui.print_message(f"*** JSON capture enabled: {args.json_log} ***\n") + + except Exception as e: + timestamp = time.strftime('%H:%M:%S') + ui.print_message(f"^1[^7{timestamp}^1] Error: Stats connection failed: {e}^7\n") + logger.error(f'Stats connection failed: {e}') + + # Try to parse as game event + parsed_event = event_parser.parse_event(message) + if parsed_event: + ui.print_message(parsed_event) + continue + + # Check if it looks like JSON but wasn't parsed + stripped = message.strip() + if stripped and stripped[0] in ('{', '['): + logger.debug('Unparsed JSON event') + continue + + # Try powerup message formatting + powerup_msg = format_powerup_message(message, game_state.player_tracker) + if powerup_msg: + ui.print_message(powerup_msg) + continue + + # Filter bot debug messages in default mode + is_bot_debug = (' entered ' in message and + any(x in message for x in [' seek ', ' battle ', ' chase', ' fight'])) + if is_bot_debug and args.verbose == 0: + logger.debug(f'Filtered bot debug: {message[:50]}') + continue + + # Check if it's a chat message + if ':' in message and not message.startswith(('print', 'broadcast', 'zmq')): + message = format_chat_message(message, game_state.player_tracker) + + # Format and display message + formatted_msg, attributes = format_message(message) + ui.print_message(formatted_msg) + +if __name__ == '__main__': + curses.wrapper(main_loop) diff --git a/network.py b/network.py new file mode 100644 index 0000000..15261e4 --- /dev/null +++ b/network.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +""" +ZMQ network layer for QLPyCon +Handles RCON and stats stream connections +""" + +import zmq +import struct +import logging +import time + +logger = logging.getLogger('network') + + +def read_socket_event(msg): + """Parse ZMQ socket monitor event""" + event_id = struct.unpack(' 0 else 0 + accuracies[weapon] = accuracy + return accuracies + + +class EventParser: + """Parses JSON game events into formatted messages""" + + def __init__(self, game_state, json_logger=None, unknown_logger=None): + self.game_state = game_state + self.json_logger = json_logger + self.unknown_logger = unknown_logger + + def parse_event(self, message): + """ + Parse JSON event and return formatted message string + Returns None if event should not be displayed + """ + try: + event = json.loads(message) + + # Log all JSON if logger is configured + if self.json_logger: + self.json_logger.info('JSON Event received:') + self.json_logger.info(json.dumps(event, indent=2)) + self.json_logger.info('---') + + if 'TYPE' not in event or 'DATA' not in event: + logger.debug('JSON missing TYPE or DATA') + return None + + event_type = event['TYPE'] + data = event['DATA'] + + # Route to appropriate handler + handler_map = { + 'PLAYER_SWITCHTEAM': self._handle_switchteam, + 'PLAYER_DEATH': self._handle_death, + 'PLAYER_KILL': self._handle_death, # Same handler + 'PLAYER_MEDAL': self._handle_medal, + 'MATCH_STARTED': self._handle_match_started, + 'MATCH_REPORT': self._handle_match_report, + 'PLAYER_STATS': self._handle_player_stats, + 'PLAYER_CONNECT': lambda d: None, + 'PLAYER_DISCONNECT': lambda d: None, + 'ROUND_OVER': lambda d: None, + } + + handler = handler_map.get(event_type) + if handler: + return handler(data) + else: + # Unknown event + logger.debug(f'Unknown event type: {event_type}') + if self.unknown_logger: + self.unknown_logger.info(f'Unknown event type: {event_type}') + self.unknown_logger.info(f'Full JSON: {json.dumps(event, indent=2)}') + return None + + except json.JSONDecodeError as e: + logger.debug(f'JSON decode error: {e}') + return None + except (KeyError, TypeError) as e: + logger.debug(f'Error parsing event: {e}') + return None + + def _handle_switchteam(self, data): + """Handle PLAYER_SWITCHTEAM event""" + if 'KILLER' not in data: + return None + + killer = data['KILLER'] + name = killer.get('NAME', 'Unknown') + team = killer.get('TEAM', '') + old_team = killer.get('OLD_TEAM', '') + + # Update player team + self.game_state.player_tracker.update_team(name, team) + self.game_state.player_tracker.add_player(name) + + if team == old_team: + return None + + warmup = " ^3(warmup)" if data.get('WARMUP', False) else "" + + team_messages = { + 'FREE': ' ^7joined the ^2Fight^7', + 'SPECTATOR': ' ^7joined the ^3Spectators^7', + 'RED': ' ^7joined the ^1Red ^7team', + 'BLUE': ' ^7joined the ^4Blue ^7team' + } + + old_team_messages = { + 'FREE': 'the ^2Fight^7', + 'SPECTATOR': 'the ^3Spectators^7', + 'RED': '^7the ^1Red ^7team', + 'BLUE': '^7the ^4Blue ^7team' + } + + team_msg = team_messages.get(team, f' ^7joined team {team}^7') + old_team_msg = old_team_messages.get(old_team, f'team {old_team}') + + team_prefix = get_team_prefix(name, self.game_state.player_tracker) + return f"{team_prefix}{name}{team_msg} from {old_team_msg}{warmup}\n" + + def _handle_death(self, data): + """Handle PLAYER_DEATH and PLAYER_KILL events""" + if 'VICTIM' not in data: + return None + + victim = data['VICTIM'] + victim_name = victim.get('NAME', 'Unknown') + + # Check for duplicate + time_val = data.get('TIME', 0) + killer_name = data.get('KILLER', {}).get('NAME', '') if data.get('KILLER') else '' + + if self.game_state.event_deduplicator.is_duplicate('PLAYER_DEATH', time_val, killer_name, victim_name): + return None + + # Update victim team + if 'TEAM' in victim: + self.game_state.player_tracker.update_team(victim_name, victim['TEAM']) + self.game_state.player_tracker.add_player(victim_name) + + victim_prefix = get_team_prefix(victim_name, self.game_state.player_tracker) + warmup = " ^3(warmup)" if data.get('WARMUP', False) else "" + + # Environmental death (no killer) + if 'KILLER' not in data or not data['KILLER']: + mod = data.get('MOD', 'UNKNOWN') + msg_template = DEATH_MESSAGES.get(mod, "%s%s ^1DIED FROM %s^7") + + if mod in DEATH_MESSAGES: + msg = msg_template % (victim_prefix, victim_name) + else: + msg = msg_template % (victim_prefix, victim_name, mod) + + return f"{msg}{warmup}\n" + + # Player killed by another player + killer = data['KILLER'] + killer_name = killer.get('NAME', 'Unknown') + + # Update killer team + if 'TEAM' in killer: + self.game_state.player_tracker.update_team(killer_name, killer['TEAM']) + self.game_state.player_tracker.add_player(killer_name) + + killer_prefix = get_team_prefix(killer_name, self.game_state.player_tracker) + + # Suicide + if killer_name == victim_name: + weapon = killer.get('WEAPON', 'OTHER_WEAPON') + if weapon != 'OTHER_WEAPON': + weapon_name = WEAPON_NAMES.get(weapon, weapon) + return f"{killer_prefix}{killer_name} ^7committed suicide with the ^7{weapon_name}{warmup}\n" + return None + + # Regular kill + weapon = killer.get('WEAPON', 'UNKNOWN') + weapon_name = WEAPON_KILL_NAMES.get(weapon, f'the {weapon}') + + return f"{killer_prefix}{killer_name} ^7fragged^7 {victim_prefix}{victim_name} ^7with {weapon_name}{warmup}\n" + + def _handle_medal(self, data): + """Handle PLAYER_MEDAL event""" + name = data.get('NAME', 'Unknown') + medal = data.get('MEDAL', 'UNKNOWN') + warmup = " ^3(warmup)^7" if data.get('WARMUP', False) else "" + + team_prefix = get_team_prefix(name, self.game_state.player_tracker) + return f"{team_prefix}{name} ^7got a medal: ^6{medal}{warmup}\n" + + def _handle_match_started(self, data): + """Handle MATCH_STARTED event""" + if self.game_state.server_info.is_team_mode(): + return None + + players = [] + for player in data.get('PLAYERS', []): + name = player.get('NAME', 'Unknown') + players.append(name) + + if players: + formatted = " vs. ".join(players) + return f"Match has started - {formatted}\n" + + return None + + def _handle_match_report(self, data): + """Handle MATCH_REPORT event""" + if not self.game_state.server_info.is_team_mode(): + return None + + red_score = int(data.get('TSCORE0', '0')) + blue_score = int(data.get('TSCORE1', '0')) + + if red_score > blue_score: + return f"^1RED TEAM ^7WINS by a score of {red_score} to {blue_score}\n" + elif blue_score > red_score: + return f"^4BLUE TEAM ^7WINS by a score of {blue_score} to {red_score}\n" + else: + return f"^7The match is a TIE with a score of {red_score} to {blue_score}\n" + + def _handle_player_stats(self, data): + """Handle PLAYER_STATS event""" + name = data.get('NAME', 'Unknown') + team_prefix = get_team_prefix(name, self.game_state.player_tracker) + + kills = int(data.get('KILLS', '0')) + deaths = int(data.get('DEATHS', '0')) + + weapon_data = data.get('WEAPONS', {}) + accuracies = calculate_weapon_accuracies(weapon_data) + + if not accuracies: + return None + + best_weapon = max(accuracies, key=accuracies.get) + best_accuracy = accuracies[best_weapon] * 100 + weapon_stats = weapon_data.get(best_weapon, {}) + best_weapon_kills = int(weapon_stats.get('K', 0)) + + weapon_name = WEAPON_NAMES.get(best_weapon, best_weapon) + + return f"^7{team_prefix}{name} K/D: {kills}/{deaths} | Best Weapon: {weapon_name} - Acc: {best_accuracy:.2f}% - Kills: {best_weapon_kills}\n" diff --git a/state.py b/state.py new file mode 100644 index 0000000..d442da3 --- /dev/null +++ b/state.py @@ -0,0 +1,180 @@ +#!/usr/bin/env python3 +""" +Game state management for QLPyCon +Tracks server info, players, and teams +""" + +import re +import logging +from config import TEAM_MODES, TEAM_MAP, MAX_RECENT_EVENTS + +logger = logging.getLogger('state') + + +class ServerInfo: + """Tracks current server information""" + + def __init__(self): + self.hostname = 'Unknown' + self.map = 'Unknown' + self.gametype = 'Unknown' + self.timelimit = '0' + self.fraglimit = '0' + self.roundlimit = '0' + self.capturelimit = '0' + self.maxclients = '0' + self.curclients = '0' + self.red_score = 0 + self.blue_score = 0 + self.players = [] + self.last_update = 0 + + def is_team_mode(self): + """Check if current gametype is a team mode""" + return self.gametype in TEAM_MODES + + def update_from_cvar(self, cvar_name, value): + """Update server info from a cvar response""" + # Normalize cvar name to lowercase for case-insensitive matching + cvar_lower = cvar_name.lower() + + mapping = { + 'qlx_serverbrandname': 'hostname', + 'g_factorytitle': 'gametype', + 'mapname': 'map', + 'timelimit': 'timelimit', + 'fraglimit': 'fraglimit', + 'roundlimit': 'roundlimit', + 'capturelimit': 'capturelimit', + 'sv_maxclients': 'maxclients' + } + + attr = mapping.get(cvar_lower) + if attr: + # Only strip color codes for non-hostname fields + if attr != 'hostname': + value = re.sub(r'\^\d', '', value) + setattr(self, attr, value) + logger.info(f'Updated {attr}: {value}') + return True + return False + + +class PlayerTracker: + """Tracks player teams and information""" + + def __init__(self, server_info): + self.server_info = server_info + self.player_teams = {} + + def update_team(self, name, team): + """Update player team. Team can be int or string""" + if not self.server_info.is_team_mode(): + return + + # Convert numeric team to string + if isinstance(team, int): + team = TEAM_MAP.get(team, 'FREE') + + if team not in ['RED', 'BLUE', 'FREE', 'SPECTATOR']: + team = 'FREE' + + # Store both original name and color-stripped version + self.player_teams[name] = team + clean_name = re.sub(r'\^\d', '', name) + if clean_name != name: + self.player_teams[clean_name] = team + + logger.debug(f'Updated team for {name} (clean: {clean_name}): {team}') + + def get_team(self, name): + """Get player's team""" + return self.player_teams.get(name) + + def add_player(self, name, score='0', ping='0'): + """Add player to server's player list if not exists""" + if not any(p['name'] == name for p in self.server_info.players): + self.server_info.players.append({ + 'name': name, + 'score': score, + 'ping': ping + }) + + def get_players_by_team(self): + """Get players organized by team""" + teams = {'RED': [], 'BLUE': [], 'SPECTATOR': [], 'FREE': []} + for player in self.server_info.players: + name = player['name'] + team = self.player_teams.get(name, 'FREE') + if team not in teams: + team = 'FREE' + teams[team].append(name) + return teams + + def remove_player(self, name): + """Remove player from tracking""" + clean_name = re.sub(r'\^\d', '', name) + + # Remove from player list (check both name and clean name) + self.server_info.players = [ + p for p in self.server_info.players + if p['name'] != name and re.sub(r'\^\d', '', p['name']) != clean_name + ] + + # Remove from team tracking + if name in self.player_teams: + del self.player_teams[name] + if clean_name in self.player_teams and clean_name != name: + del self.player_teams[clean_name] + + logger.debug(f'Removed player: {name} (clean: {clean_name})') + + def rename_player(self, old_name, new_name): + """Rename a player while maintaining their team""" + # Get current team + team = self.player_teams.get(old_name, 'SPECTATOR') + + # Remove old entries + self.remove_player(old_name) + + # Add with new name and team + if self.server_info.is_team_mode(): + self.update_team(new_name, team) + self.add_player(new_name) + + logger.debug(f'Renamed player: {old_name} -> {new_name}') + +class EventDeduplicator: + """Prevents duplicate kill/death events""" + + def __init__(self): + self.recent_events = [] + + def is_duplicate(self, event_type, time_val, killer_name, victim_name): + """Check if this kill/death event is a duplicate""" + if event_type not in ('PLAYER_DEATH', 'PLAYER_KILL'): + return False + + signature = f"KILL:{time_val}:{killer_name}:{victim_name}" + + if signature in self.recent_events: + logger.debug(f'Duplicate event: {signature}') + return True + + # Add to recent events + self.recent_events.append(signature) + if len(self.recent_events) > MAX_RECENT_EVENTS: + self.recent_events.pop(0) + + return False + + +class GameState: + """Main game state container""" + + def __init__(self): + self.server_info = ServerInfo() + self.player_tracker = PlayerTracker(self.server_info) + self.event_deduplicator = EventDeduplicator() + self.pending_background_commands = set() + self.status_buffer = [] diff --git a/ui.py b/ui.py new file mode 100644 index 0000000..bfed193 --- /dev/null +++ b/ui.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python3 +""" +Curses-based UI for QLPyCon +Handles terminal display, windows, and color rendering +""" + +import curses +import curses.textpad +import threading +import queue +import logging +from config import COLOR_PAIRS, INFO_WINDOW_HEIGHT, INFO_WINDOW_Y, OUTPUT_WINDOW_Y, INPUT_WINDOW_HEIGHT, TEAM_MODES + +logger = logging.getLogger('ui') + + +class CursesHandler(logging.Handler): + """Logging handler that outputs to curses window""" + + def __init__(self, window): + logging.Handler.__init__(self) + self.window = window + + def emit(self, record): + try: + msg = self.format(record) + fs = "%s\n" + try: + print_colored(self.window, fs % msg, 0) + self.window.refresh() + except UnicodeError: + print_colored(self.window, fs % msg.encode("UTF-8"), 0) + self.window.refresh() + except (KeyboardInterrupt, SystemExit): + raise + except: + self.handleError(record) + + +def print_colored(window, message, attributes=0): + """ + Print message with Quake color codes (^N) + ^0 = black, ^1 = red, ^2 = green, ^3 = yellow, ^4 = blue, ^5 = cyan, ^6 = magenta, ^7 = white + """ + if not curses.has_colors: + window.addstr(message) + return + + color = 0 + parse_color = False + + for ch in message: + val = ord(ch) + if parse_color: + if ord('0') <= val <= ord('7'): + color = val - ord('0') + if color == 7: + color = 0 + else: + window.addch('^', curses.color_pair(color) | attributes) + window.addch(ch, curses.color_pair(color) | attributes) + parse_color = False + elif ch == '^': + parse_color = True + else: + window.addch(ch, curses.color_pair(color) | attributes) + + window.refresh() + + +class UIManager: + """Manages curses windows and display""" + + def __init__(self, screen, host): + self.screen = screen + self.host = host + self.info_window = None + self.output_window = None + self.input_window = None + self.divider_window = None + self.input_queue = None + + self._init_curses() + self._create_windows() + + def _init_curses(self): + """Initialize curses settings""" + curses.endwin() + curses.initscr() + self.screen.nodelay(1) + curses.start_color() + curses.use_default_colors() + curses.cbreak() + curses.setsyx(-1, -1) + + self.screen.addstr(f"Quake Live PyCon: {self.host}") + self.screen.refresh() + + # Initialize color pairs + for i in range(1, 7): + curses.init_pair(i, i, 0) + + # Swap cyan and magenta (5 and 6) + curses.init_pair(5, 6, 0) + curses.init_pair(6, 5, 0) + + def _create_windows(self): + """Create all UI windows""" + maxy, maxx = self.screen.getmaxyx() + + # Server info window (top) + self.info_window = curses.newwin( + INFO_WINDOW_HEIGHT, + maxx - 4, + INFO_WINDOW_Y, + 2 + ) + self.info_window.scrollok(False) + self.info_window.idlok(False) + self.info_window.leaveok(True) + self.info_window.refresh() + + # Output window (middle - main display) + self.output_window = curses.newwin( + maxy - 15, + maxx - 4, + OUTPUT_WINDOW_Y, + 2 + ) + self.output_window.scrollok(True) + self.output_window.idlok(True) + self.output_window.leaveok(True) + self.output_window.refresh() + + # Input window (bottom) + self.input_window = curses.newwin( + INPUT_WINDOW_HEIGHT, + maxx - 6, + maxy - 2, + 4 + ) + self.screen.addstr(maxy - 2, 2, '$ ') + self.input_window.idlok(True) + self.input_window.leaveok(False) + self.input_window.refresh() + + # Divider line + self.divider_window = curses.newwin( + 1, + maxx - 4, + maxy - 3, + 2 + ) + self.divider_window.hline(curses.ACS_HLINE, maxx - 4) + self.divider_window.refresh() + + self.screen.refresh() + + def setup_input_queue(self): + """Setup threaded input queue""" + def wait_stdin(q, window): + while True: + line = curses.textpad.Textbox(window).edit() + if len(line) > 0: + q.put(line) + window.clear() + window.refresh() + + self.input_queue = queue.Queue() + t = threading.Thread(target=wait_stdin, args=(self.input_queue, self.input_window)) + t.daemon = True + t.start() + + return self.input_queue + + def setup_logging(self): + """Setup logging handler for output window""" + handler = CursesHandler(self.output_window) + formatter = logging.Formatter('%(asctime)-8s|%(name)-12s|%(levelname)-6s|%(message)-s', '%H:%M:%S') + handler.setFormatter(formatter) + return handler + + def print_message(self, message, attributes=0): + """Print formatted message to output window""" + y, x = curses.getsyx() + print_colored(self.output_window, message, attributes) + curses.setsyx(y, x) + curses.doupdate() + + def update_server_info(self, game_state): + """Update server info window""" + self.info_window.clear() + + max_y, max_x = self.info_window.getmaxyx() + server_info = game_state.server_info + + # Line 1: Hostname + hostname = server_info.hostname + print_colored(self.info_window, f"^6═══ {hostname} ^6═══^7\n", 0) + + # Line 2: Game info + gametype = server_info.gametype + mapname = server_info.map + timelimit = server_info.timelimit + fraglimit = server_info.fraglimit + roundlimit = server_info.roundlimit + caplimit = server_info.capturelimit + curclients = server_info.curclients + maxclients = server_info.maxclients + + print_colored(self.info_window, + f"^3Type:^7 {gametype} ^3Map:^7 {mapname} ^3Players:^7 {curclients}/{maxclients} " + f"^3Limits (T/F/R/C):^7 {timelimit}/{fraglimit}/{roundlimit}/{caplimit}\n", 0) + + # Line 3: Team headers and player lists + teams = game_state.player_tracker.get_players_by_team() + + if server_info.gametype in TEAM_MODES: + print_colored(self.info_window, f"^1(RED) ^4(BLUE) ^3(SPEC)\n", 0) + + red_players = teams['RED'][:4] + blue_players = teams['BLUE'][:4] + spec_players = teams['SPECTATOR'][:4] + + for i in range(4): + red = red_players[i] if i < len(red_players) else '' + blue = blue_players[i] if i < len(blue_players) else '' + spec = spec_players[i] if i < len(spec_players) else '' + + # Strip color codes for padding calculation + from formatter import strip_color_codes + red_clean = strip_color_codes(red) + blue_clean = strip_color_codes(blue) + spec_clean = strip_color_codes(spec) + + # Calculate padding (24 chars per column) + red_pad = 24 - len(red_clean) + blue_pad = 24 - len(blue_clean) + + line = f"{red}{' ' * red_pad}{blue}{' ' * blue_pad}{spec}\n" + print_colored(self.info_window, line, 0) + else: + print_colored(self.info_window, f"^3(FREE)\n", 0) + free_players = teams['FREE'][:4] + for player in free_players: + print_colored(self.info_window, f"{player}\n", 0) + # Fill remaining lines + for i in range(4 - len(free_players)): + self.info_window.addstr("\n") + + # Blank lines to fill + self.info_window.addstr("\n\n") + + # Separator + separator = "^6" + "═" * (max_x - 1) + "^7" + print_colored(self.info_window, separator, 0) + + self.info_window.refresh()