#!/usr/bin/env python3 """ QLPyCon - Quake Live Python Console Main entry point """ import argparse import uuid import logging import time import re import curses import zmq import signal import sys from config import VERSION, DEFAULT_HOST, POLL_TIMEOUT from state import GameState from network import RconConnection, StatsConnection from parser import EventParser from formatter import format_message, format_chat_message, format_powerup_message from ui import UIManager # Configure logging logger = logging.getLogger('main') logger.setLevel(logging.DEBUG) all_json_logger = logging.getLogger('all_json') all_json_logger.setLevel(logging.DEBUG) unknown_json_logger = logging.getLogger('unknown_json') unknown_json_logger.setLevel(logging.DEBUG) def signal_handler(sig, frame): """Handle Ctrl+C for immediate shutdown""" # Don't call curses.endwin() here - let curses.wrapper handle it sys.exit(0) def parse_cvar_response(message, game_state, ui): """ Parse server cvar responses and update state Returns True if message should be suppressed from display """ # Suppress command echo lines if message.startswith('zmq RCON command') and 'from' in message: suppress_cmds = ['status', 'roundlimit', 'qlx_serverBrandName', 'g_factoryTitle', 'mapname', 'timelimit', 'fraglimit', 'capturelimit', 'sv_maxclients'] if any(f': {cmd}' in message for cmd in suppress_cmds): return True # Parse cvar responses (format: "cvar_name" is:"value" default:...) cvar_match = re.search(r'"([^"]+)"\s+is:"([^"]*)"', message) if cvar_match: cvar_name = cvar_match.group(1) value = cvar_match.group(2) if game_state.server_info.update_from_cvar(cvar_name, value): ui.update_server_info(game_state) return True return False def handle_stats_connection(message, rcon, ui, game_state): """ Handle stats connection info extraction Returns (stats_port, stats_password) or (None, None) """ stats_port = None stats_password = None # Extract stats port if 'net_port' in message and ' is:' in message and '"net_port"' in message: match = re.search(r' is:"([^"]+)"', message) if match: port_str = match.group(1).strip() digit_match = re.search(r'(\d+)', port_str) if digit_match: stats_port = digit_match.group(1) logger.info(f'Got stats port: {stats_port}') # Extract stats password if 'zmq_stats_password' in message and ' is:' in message and '"zmq_stats_password"' in message: match = re.search(r' is:"([^"]+)"', message) if match: password_str = match.group(1) password_str = re.sub(r'\^\d', '', password_str) # Strip color codes stats_password = password_str.strip() logger.info(f'Got stats password: {stats_password}') return stats_port, stats_password def parse_player_events(message, game_state, ui): """ Parse connect, disconnect, kick, and rename messages Returns True if message should be suppressed """ try: msg = message # Strip broadcast: print "..." wrapper with regex broadcast_match = re.match(r'^broadcast:\s*print\s*"(.+?)(?:\\n)?"\s*$', msg) if broadcast_match: msg = broadcast_match.group(1) # Strip timestamp: [HH:MM:SS] or ^3[^7HH:MM:SS^3]^7 msg = re.sub(r'\^\d\[\^\d[0-9:]+\^\d\]\^\d\s*', '', msg) msg = re.sub(r'\[[0-9:]+\]\s*', '', msg) msg = msg.strip() if not msg: return False logger.debug(f'parse_player_events: {repr(msg)}') # Strip color codes for matching from formatter import strip_color_codes clean_msg = strip_color_codes(msg) # Match connects: "NAME connected" or "NAME connected with Steam ID" connect_match = re.match(r'^(.+?)\s+connected', clean_msg) if connect_match: player_name_match = re.match(r'^(.+?)\s+connected', msg) player_name = player_name_match.group(1).strip() if player_name_match else connect_match.group(1).strip() player_name = re.sub(r'\^\d+$', '', player_name) logger.info(f'CONNECT: {repr(player_name)}') if game_state.server_info.is_team_mode(): game_state.player_tracker.update_team(player_name, 'SPECTATOR') game_state.player_tracker.add_player(player_name) ui.update_server_info(game_state) # Only print if this is NOT the Steam ID line if 'Steam ID' not in message: timestamp = time.strftime('%H:%M:%S') ui.print_message(f"^3[^7{timestamp}^3]^7 ^0{player_name}^9 ^2connected^7\n") return True # Regular disconnect disconnect_match = re.match(r'^(.+?)\s+disconnected', clean_msg) if disconnect_match: original_match = re.match(r'^(.+?)\s+disconnected', msg) player_name = original_match.group(1).strip() if original_match else disconnect_match.group(1).strip() player_name = re.sub(r'\^\d+$', '', player_name) player_name = re.sub(r'^\^\d+', '', player_name) logger.info(f'DISCONNECT: {repr(player_name)}') game_state.player_tracker.remove_player(player_name) ui.update_server_info(game_state) timestamp = time.strftime('%H:%M:%S') ui.print_message(f"^3[^7{timestamp}^3]^7 ^0{player_name}^9 ^1disconnected^7\n") return True # Kick kick_match = re.match(r'^(.+?)\s+was kicked', clean_msg) if kick_match: original_match = re.match(r'^(.+?)\s+was kicked', msg) player_name = original_match.group(1).strip() if original_match else kick_match.group(1).strip() player_name = re.sub(r'\^\d+$', '', player_name) player_name = re.sub(r'^\^\d+', '', player_name) logger.info(f'KICK: {repr(player_name)}') game_state.player_tracker.remove_player(player_name) ui.update_server_info(game_state) timestamp = time.strftime('%H:%M:%S') ui.print_message(f"^3[^7{timestamp}^3]^7 ^0{player_name}^9 ^1was kicked^7\n") return True # Inactivity inactivity_match = re.match(r'^(.+?)\s+Dropped due to inactivity', clean_msg) if inactivity_match: original_match = re.match(r'^(.+?)\s+Dropped due to inactivity', msg) player_name = original_match.group(1).strip() if original_match else inactivity_match.group(1).strip() player_name = re.sub(r'\^\d+$', '', player_name) player_name = re.sub(r'^\^\d+', '', player_name) logger.info(f'INACTIVITY DROP: {repr(player_name)}') game_state.player_tracker.remove_player(player_name) ui.update_server_info(game_state) timestamp = time.strftime('%H:%M:%S') ui.print_message(f"^3[^7{timestamp}^3]^7 ^0{player_name}^9 ^3dropped due to inactivity^7\n") return True # Match renames: "OldName renamed to NewName" rename_match = re.match(r'^(.+?)\s+renamed to\s+(.+?)$', clean_msg) if rename_match: # Extract from original message original_match = re.match(r'^(.+?)\s+renamed to\s+(.+?)$', msg) if original_match: old_name = original_match.group(1).strip() new_name = original_match.group(2).strip() else: old_name = rename_match.group(1).strip() new_name = rename_match.group(2).strip() # Remove trailing color codes from both names old_name = re.sub(r'\^\d+$', '', old_name) new_name = re.sub(r'^\^\d+', '', new_name) # Remove leading ^7 from new name new_name = re.sub(r'\^\d+$', '', new_name) # Remove trailing too old_name = old_name.rstrip('\n\r') # Remove trailing newline new_name = new_name.rstrip('\n\r') # Remove trailing newline logger.info(f'RENAME: {repr(old_name)} -> {repr(new_name)}') game_state.player_tracker.rename_player(old_name, new_name) ui.update_server_info(game_state) timestamp = time.strftime('%H:%M:%S') ui.print_message(f"^3[^7{timestamp}^3]^7 ^0{old_name}^9 ^6renamed to^7 ^0{new_name}^9\n") return True except Exception as e: logger.error(f'Error in parse_player_events: {e}') return False def main_loop(screen): """Main application loop""" # Setup signal handler for Ctrl+C signal.signal(signal.SIGINT, signal_handler) # Parse arguments parser = argparse.ArgumentParser(description='Verbose QuakeLive server statistics') parser.add_argument('--host', default=DEFAULT_HOST, help=f'ZMQ URI to connect to. Defaults to {DEFAULT_HOST}') parser.add_argument('--password', required=False, help='RCON password') parser.add_argument('--identity', default=uuid.uuid1().hex, help='Socket identity (random UUID by default)') parser.add_argument('-v', '--verbose', action='count', default=0, help='Increase verbosity (-v INFO, -vv DEBUG)') parser.add_argument('--unknown-log', default='unknown_events.log', help='File to log unknown JSON events') parser.add_argument('-j', '--json', dest='json_log', default=None, help='File to log all JSON events') args = parser.parse_args() # Set logging level if args.verbose == 0: logger.setLevel(logging.WARNING) elif args.verbose == 1: logger.setLevel(logging.INFO) else: logger.setLevel(logging.DEBUG) # Setup file logging for unknown events unknown_handler = logging.FileHandler(args.unknown_log, mode='a') unknown_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S') unknown_handler.setFormatter(unknown_formatter) unknown_json_logger.addHandler(unknown_handler) unknown_json_logger.propagate = False # Initialize components ui = UIManager(screen, args.host) game_state = GameState() # Setup logging to output window log_handler = ui.setup_logging() logger.addHandler(log_handler) # Setup input queue input_queue = ui.setup_input_queue() # Display startup messages ui.print_message(f"*** QL pyCon Version {VERSION} starting ***\n") ui.print_message(f"zmq python bindings {zmq.__version__}, libzmq version {zmq.zmq_version()}\n") # Initialize network connections rcon = RconConnection(args.host, args.password, args.identity) rcon.connect() stats_conn = None stats_port = None stats_password = None stats_check_counter = 0 # Shutdown flag shutdown = False # Setup JSON logging if requested json_logger = None if args.json_log: json_handler = logging.FileHandler(args.json_log, mode='a') json_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S') json_handler.setFormatter(json_formatter) all_json_logger.addHandler(json_handler) all_json_logger.propagate = False json_logger = all_json_logger # Create event parser event_parser = EventParser(game_state, json_logger, unknown_json_logger) # Main event loop while not shutdown: # Poll RCON socket event = rcon.poll(POLL_TIMEOUT) # Check monitor for connection events monitor_event = rcon.check_monitor() if monitor_event and monitor_event[0] == zmq.EVENT_CONNECTED: ui.print_message("Connected to server\n") rcon.send_command(b'register') logger.info('Registration message sent') ui.print_message("Requesting connection info...\n") rcon.send_command(b'zmq_stats_password') rcon.send_command(b'net_port') # Handle user input while not input_queue.empty(): command = input_queue.get() logger.info(f'Sending command: {repr(command.strip())}') # Display command with timestamp timestamp = time.strftime('%H:%M:%S') ui.print_message(f"^5[^7{timestamp}^5] >>> {command.strip()}^7\n") rcon.send_command(command) # Poll stats stream if connected if stats_conn and stats_conn.connected: stats_check_counter += 1 if stats_check_counter % 100 == 0: logger.debug(f'Stats polling active (check #{stats_check_counter // 100})') stats_msg = stats_conn.recv_message() if stats_msg: logger.info(f'Stats event received ({len(stats_msg)} bytes)') # Parse game event parsed = event_parser.parse_event(stats_msg) if parsed: # Format with timestamp before displaying formatted_msg, attributes = format_message(parsed) ui.print_message(formatted_msg) ui.update_server_info(game_state) # Process RCON messages if event > 0: logger.debug('Socket has data available') msg_count = 0 while True: message = rcon.recv_message() if message is None: if msg_count > 0: logger.debug(f'Read {msg_count} message(s)') break msg_count += 1 if len(message) == 0: logger.debug('Received empty message (keepalive)') continue logger.debug(f'Received message ({len(message)} bytes): {repr(message[:100])}') # Check for player connect/disconnect/rename events if parse_player_events(message, game_state, ui): continue if '------- Game Initialization -------' in message or 'Game Initialization' in message: logger.info('Game initialization detected - refreshing server info') timestamp = time.strftime('%H:%M:%S') ui.print_message(f"^3[^7{timestamp}^3] ^0^3Game initialized - Refreshing server info^9^7\n") rcon.send_command(b'qlx_serverBrandName') rcon.send_command(b'g_factoryTitle') rcon.send_command(b'mapname') rcon.send_command(b'timelimit') rcon.send_command(b'fraglimit') rcon.send_command(b'roundlimit') rcon.send_command(b'capturelimit') rcon.send_command(b'sv_maxclients') # Clear player list since map changed game_state.server_info.players = [] game_state.player_tracker.player_teams = {} ui.update_server_info(game_state) # Try to parse as cvar response if parse_cvar_response(message, game_state, ui): logger.debug('Suppressed cvar response') continue # Check for stats connection info port, password = handle_stats_connection(message, rcon, ui, game_state) if port: stats_port = port if password: stats_password = password # Connect to stats if we have both credentials if stats_port and stats_password and stats_conn is None: try: ui.print_message("Connecting to stats stream...\n") host_ip = args.host.split('//')[1].split(':')[0] stats_conn = StatsConnection(host_ip, stats_port, stats_password) stats_conn.connect() ui.print_message("Stats stream connected - ready for game events\n") # Request initial server info logger.info('Sending initial server info queries') rcon.send_command(b'qlx_serverBrandName') rcon.send_command(b'g_factoryTitle') rcon.send_command(b'mapname') rcon.send_command(b'timelimit') rcon.send_command(b'fraglimit') rcon.send_command(b'roundlimit') rcon.send_command(b'capturelimit') rcon.send_command(b'sv_maxclients') if args.json_log: ui.print_message(f"*** JSON capture enabled: {args.json_log} ***\n") except Exception as e: timestamp = time.strftime('%H:%M:%S') ui.print_message(f"^1[^7{timestamp}^1] Error: Stats connection failed: {e}^7\n") logger.error(f'Stats connection failed: {e}') # Try to parse as game event parsed_event = event_parser.parse_event(message) if parsed_event: ui.print_message(parsed_event) continue # Check if it looks like JSON but wasn't parsed stripped = message.strip() if stripped and stripped[0] in ('{', '['): logger.debug('Unparsed JSON event') continue # Try powerup message formatting powerup_msg = format_powerup_message(message, game_state.player_tracker) if powerup_msg: ui.print_message(powerup_msg) continue # Filter bot debug messages in default mode is_bot_debug = (' entered ' in message and any(x in message for x in [' seek ', ' battle ', ' chase', ' fight'])) if is_bot_debug and args.verbose == 0: logger.debug(f'Filtered bot debug: {message[:50]}') continue # Check if it's a chat message if ':' in message and not message.startswith(('print', 'broadcast', 'zmq')): message = format_chat_message(message, game_state.player_tracker) # Format and display message formatted_msg, attributes = format_message(message) ui.print_message(formatted_msg) if __name__ == '__main__': curses.wrapper(main_loop)