diff --git a/qlpycon.py b/qlpycon.py deleted file mode 100644 index e87f92f..0000000 --- a/qlpycon.py +++ /dev/null @@ -1,1170 +0,0 @@ -#!/usr/bin/env python3 -# Version: 0.7.0 - -import sys -import re -import time -import struct -import argparse -import uuid -import threading -import queue -import json - -import logging -logger = logging.getLogger('logger') -logger.setLevel(logging.DEBUG) - -# Separate logger for all JSON events when --json flag is used -all_json_logger = logging.getLogger('all_json') -all_json_logger.setLevel(logging.DEBUG) - -# Separate logger for unknown JSON events -unknown_json_logger = logging.getLogger('unknown_json') -unknown_json_logger.setLevel(logging.DEBUG) - -import zmq -import curses -import curses.textpad -import signal - -import unittest - -import locale -locale.setlocale(locale.LC_ALL, '') - -# Server info state -server_info = { - 'hostname': 'Unknown', - 'map': 'Unknown', - 'gametype': 'Unknown', - 'timelimit': '0', - 'fraglimit': '0', - 'roundlimit': '0', - 'capturelimit': '0', - 'players': [], - 'red_score': 0, - 'blue_score': 0, - 'last_update': 0 -} - -# Message buffering for status output -status_buffer = [] -in_status_output = False - -# Pending background commands -pending_background_commands = set() - -current_gametype = None - -# Track recent events to avoid duplicates -recent_events = [] -MAX_RECENT_EVENTS = 10 - -team_modes = ["Team Deathmatch", "Clan Arena", "Capture The Flag", "One Flag CTF", "Overload", "Harvester", "Freeze Tag" ] - -player_teams = {} - -def update_player_team(name, team): - """Update player team tracking. Team can be int (1=RED, 2=BLUE, 3=SPECTATOR) or string""" - if current_gametype in team_modes: - # Convert numeric team to string - if isinstance(team, int): - team_map = {1: 'RED', 2: 'BLUE', 3: 'SPECTATOR', 0: 'FREE'} - team = team_map.get(team, 'FREE') - - if team not in ['RED', 'BLUE', 'FREE', 'SPECTATOR']: - team = 'FREE' - - # Store both the original name and the color-stripped version - player_teams[name] = team - clean_name = re.sub(r'\^\d', '', name) - if clean_name != name: - player_teams[clean_name] = team - - logger.debug(f'Updated team for {name} (clean: {clean_name}): {team}') - -def _readSocketEvent( msg ): - event_id = struct.unpack( ' 0 else 0 - accuracies[weapon] = accuracy - return accuracies - -def _checkMonitor( monitor ): - try: - event_monitor = monitor.recv( zmq.NOBLOCK ) - except zmq.Again: - return - - ( event_id, event_name, event_value ) = _readSocketEvent( event_monitor ) - event_monitor_endpoint = monitor.recv( zmq.NOBLOCK ) - logger.debug( 'monitor: %s %d endpoint %s' % ( event_name, event_value, event_monitor_endpoint ) ) - return ( event_id, event_value ) - -class CursesHandler(logging.Handler): - def __init__(self, screen): - logging.Handler.__init__(self) - self.screen = screen - def emit(self, record): - try: - msg = self.format(record) - screen = self.screen - fs = "%s\n" - try: - PrintMessageFormatted(screen, fs % msg) - screen.refresh() - except UnicodeError: - PrintMessageFormatted(screen, fs % msg.encode("UTF-8")) - screen.refresh() - except (KeyboardInterrupt, SystemExit): - raise - except: - self.handleError(record) - -def setupInputQueue(window): - def waitStdin( q ): - while ( True ): - l = curses.textpad.Textbox(window).edit() - if len( l ) > 0 : - q.put( l ) - window.clear() - window.refresh() - q = queue.Queue() - t = threading.Thread( target = waitStdin, args = ( q, ) ) - t.daemon = True - t.start() - return q - -HOST = 'tcp://127.0.0.1:27961' -POLL_TIMEOUT = 100 - -def PrintMessageColored(window, message, attributes): - if not curses.has_colors: - window.addstr(message) - return - - color = 0 - parse_color = False - for ch in message: - val = ord( ch ) - if parse_color: - if val >= ord('0') and val <= ord('7'): - color = val - ord('0') - if color == 7: - color = 0 - else: - window.addch('^', curses.color_pair(color) | attributes) - window.addch(ch, curses.color_pair(color) | attributes) - parse_color = False - elif ch == '^': - parse_color = True - else: - window.addch(ch, curses.color_pair(color) | attributes) - - window.refresh() - -def PrintMessageFormatted(window, message, add_timestamp=True): - attributes = 0 - - message = message.replace("\\n", "") - message = message.replace(chr(25), "") - - if message[:10] == "broadcast:": - message = message[11:] - attributes = curses.A_BOLD - - if message[:7] == "print \"": - message = message[7:-2] + "\n" - - # Don't add timestamp to server responses, especially status command - # But DO add timestamps to "zmq RCON command" lines (server acknowledgments) - skip_timestamp_keywords = [ - 'map:', 'num score', '---', 'bot' - ] - - # Special handling: don't skip "zmq RCON" lines - they should get timestamps - # But skip everything else from status output - is_status_keyword = any(keyword in message for keyword in skip_timestamp_keywords) - - # Also don't timestamp "status" when it appears in isolation (status table context) - # But DO timestamp it when it's part of "zmq RCON command...status" (the echo line) - if 'status' in message and 'zmq RCON' not in message: - is_status_keyword = True - - # Skip very short messages or messages with leading spaces (status fragments) - is_likely_fragment = ( - len(message.strip()) <= 2 or # Very short messages - (message.startswith(' ') and len(message.strip()) < 50) or # Messages with leading space - message.strip().isdigit() # Pure numbers (scores, pings, ports, etc) - ) - - # Skip single words that are less than 20 chars (likely player names or status fragments) - is_short_word = len(message.strip()) < 20 and ' ' not in message.strip() - - # Skip IP addresses (pattern: xxx.xxx.xxx.xxx or xxx.xxx.xxx.xxx:port) - is_ip_address = False - stripped = message.strip() - allowed_chars = set('0123456789.:') - if stripped.count('.') == 3 and all(c in allowed_chars for c in stripped): - is_ip_address = True - - should_skip_timestamp = ( - message.startswith('***') or - is_likely_fragment or - is_short_word or - is_ip_address or - is_status_keyword - ) - - # ADD THIS DEBUG BLOCK HERE: - if 'K/D:' in message or 'WINS' in message or 'TEAM' in message: - logger.debug(f'=== TIMESTAMP DEBUG ===') - logger.debug(f'Message: {message[:100]}') - logger.debug(f'is_status_keyword: {is_status_keyword}') - logger.debug(f'is_likely_fragment: {is_likely_fragment}') - logger.debug(f'is_short_word: {is_short_word}') - logger.debug(f'is_ip_address: {is_ip_address}') - logger.debug(f'should_skip_timestamp: {should_skip_timestamp}') - logger.debug(f'add_timestamp: {add_timestamp}') - # Add timestamp if requested and not a special message type - - if add_timestamp and not should_skip_timestamp: - timestamp = time.strftime('%H:%M:%S') - message = f"^3[^7{timestamp}^3]^7 {message}" - - PrintMessageColored(window, message, attributes) - -def get_team_color(player_name): - """Get the color-coded team prefix for a player""" - if current_gametype not in team_modes: - return '' - - team = player_teams.get(player_name, None) - if not team: - return '' - - return { - 'RED': '^1(RED)^7', - 'BLUE': '^4(BLUE)^7', - 'FREE': '', - 'SPECTATOR': '^3(SPEC)^7' - }.get(team, '') - -def format_powerup_message(msg_str): - """Format powerup pickup and kill messages with colors""" - - if msg_str.startswith("broadcast:"): - msg_str = msg_str[11:].strip() - - # Check for powerup pickup: "PlayerName got the PowerupName!" - pickup_match = re.match(r'^(.+?)\s+got the\s+(.+?)!', msg_str) - if pickup_match: - player_name = pickup_match.group(1).strip() - powerup_name = pickup_match.group(2).strip() - - # Get team color for player - player_clean = re.sub(r'\^\d', '', player_name) - team_prefix = get_team_color(player_clean) - - # Color code powerups - powerup_colors = { - 'Quad Damage': '^5Quad Damage^7', - 'Battle Suit': '^3Battle Suit^7', - 'Regeneration': '^1Regeneration^7', - 'Haste': '^3Haste^7', - 'Invisibility': '^5Invisibility^7', - 'Flight': '^5Flight^7', - 'Medkit': '^1Medkit^7', - 'MegaHealth': '^4MegaHealth^7' - } - - colored_powerup = powerup_colors.get(powerup_name, f'^6{powerup_name}^7') - - return f"{team_prefix}{player_name} ^7got the {colored_powerup}!\n" - - # Check for powerup carrier kill: "PlayerName killed the PowerupName carrier!" - carrier_match = re.match(r'^(.+?)\s+killed the\s+(.+?)\s+carrier!', msg_str) - if carrier_match: - player_name = carrier_match.group(1).strip() - powerup_name = carrier_match.group(2).strip() - - # Get team color for player - player_clean = re.sub(r'\^\d', '', player_name) - team_prefix = get_team_color(player_clean) - - # Color code powerups (same as above) - powerup_colors = { - 'Quad Damage': '^5Quad Damage^7', - 'Battle Suit': '^3Battle Suit^7', - 'Regeneration': '^1Regeneration^7', - 'Haste': '^3Haste^7', - 'Invisibility': '^5Invisibility^7', - 'Flight': '^5Flight^7', - 'Medkit': '^1Medkit^7', - 'MegaHealth': '^4MegaHealth^7' - } - - colored_powerup = powerup_colors.get(powerup_name, f'^6{powerup_name}^7') - - return f"{team_prefix}{player_name} ^7killed the {colored_powerup} ^7carrier!\n" - - # Not a powerup message - return None - -def parse_server_response(msg_str, info_window): - """Parse server responses to update server_info, returns True if message should be suppressed""" - global server_info, pending_background_commands, status_buffer, in_status_output, current_gametype - - # Suppress only the command echo lines - if msg_str.startswith('zmq RCON command') and 'from' in msg_str: - if any(f': {cmd}' in msg_str for cmd in ['status', 'roundlimit', 'qlx_serverBrandName', 'g_factoryTitle', 'mapname', 'timelimit', 'fraglimit', 'roundlimit', 'capturelimit', 'sv_maxclients']): - return True - - # Parse qlx_serverBrandName - if '"qlx_serverbrandname"' in msg_str and ' is:' in msg_str: - match = re.search(r' is:"(.+?)" default:', msg_str) - if match: - brandname = match.group(1).strip() - server_info['hostname'] = brandname - logger.info(f'Got hostname: {server_info["hostname"]}') - UpdateServerInfoWindow(info_window) - - # Parse g_factoryTitle - if '"g_factoryTitle"' in msg_str and ' is:' in msg_str: - match = re.search(r' is:"([^"]*)"', msg_str) # Changed [^"]+ to [^"]* - if match: - gametype = match.group(1).strip() - current_gametype = re.sub(r'\^\d', '', gametype) # Strip color codes - server_info['gametype'] = current_gametype - logger.info(f'Got gametype: {current_gametype}') - UpdateServerInfoWindow(info_window) - - # Parse Map - if '"mapname"' in msg_str and ' is:' in msg_str: - match = re.search(r' is:"([^"]*)"', msg_str) - if match: - mapname = match.group(1).strip() - server_info['map'] = re.sub(r'\^\d', '', mapname) - logger.info(f'Got mapname: {server_info["map"]}') - UpdateServerInfoWindow(info_window) - - # Parse timelimit - if '"timelimit"' in msg_str and ' is:' in msg_str: - match = re.search(r' is:"([^"]*)"', msg_str) - if match: - server_info['timelimit'] = match.group(1).strip() - logger.info(f'Got Timelimit: {server_info["timelimit"]}') - UpdateServerInfoWindow(info_window) - - # Parse fraglimit - if '"fraglimit"' in msg_str and ' is:' in msg_str: - match = re.search(r' is:"([^"]*)"', msg_str) - if match: - server_info['fraglimit'] = match.group(1).strip() - logger.info(f'Got Fraglimit: {server_info["fraglimit"]}') - UpdateServerInfoWindow(info_window) - - # Parse roundlimit - if '"roundlimit"' in msg_str and ' is:' in msg_str: - match = re.search(r' is:"([^"]*)"', msg_str) - if match: - server_info['roundlimit'] = match.group(1).strip() - logger.info(f'Got Roundlimit: {server_info["roundlimit"]}') - UpdateServerInfoWindow(info_window) - - # Parse caplimit - if '"capturelimit"' in msg_str and ' is:' in msg_str: - match = re.search(r' is:"([^"]*)"', msg_str) - if match: - server_info['capturelimit'] = match.group(1).strip() - logger.info(f'Got Caplimit: {server_info["capturelimit"]}') - UpdateServerInfoWindow(info_window) - - # Parse Max Clients - if '"sv_maxclients"' in msg_str and ' is:' in msg_str: - match = re.search(r' is:"([^"]*)"', msg_str) - if match: - server_info['maxclients'] = match.group(1).strip() - logger.info(f'Got Max Clients: {server_info["maxclients"]}') - UpdateServerInfoWindow(info_window) - - # Handle status command output buffering - if 'status' in pending_background_commands: - # Buffer ALL short messages (likely parts of status output) - if len(msg_str) < 100: # Status fields are short - status_buffer.append(msg_str) - in_status_output = True - return True # Suppress - - # When we get an empty message or long message, process the buffer - if msg_str.strip() == '' or len(msg_str) > 100: - if status_buffer: - # Reconstruct the line - full_line = ' '.join(status_buffer) - logger.info(f'Reconstructed status line: {full_line[:100]}') - - # Parse if it's a player line - parts = full_line.split() - if len(parts) >= 8 and parts[0].isdigit(): - server_info['players'].append({ - 'name': parts[3], - 'score': parts[1], - 'ping': parts[2] - }) - - status_buffer = [] - - if msg_str.strip() == '': - pending_background_commands.discard('status') - in_status_output = False - - return True - - return False - -def UpdateServerInfoWindow(info_window): - """Update the server info window with current data""" - info_window.clear() - - max_y, max_x = info_window.getmaxyx() - - # Line 1: Hostname with decorative border - hostname_display = server_info.get('hostname', 'Unknown Server') - PrintMessageColored(info_window, f"^6═══ {hostname_display} ^6═══^7\n", 0) - - # Line 2: Type / Map / Limits / Players - gametype_display = server_info.get('gametype', 'Unknown Type') - mapname_display = server_info.get('map', 'Unknown Map') - timelimit_display = server_info.get('timelimit', '0') - fraglimit_display = server_info.get('fraglimit', '0') - roundlimit_display = server_info.get('roundlimit', '0') - caplimit_display = server_info.get('capturelimit', '0') - curclients_display = server_info.get('curclients', '0') - maxclients_display = server_info.get('maxclients', '0') - PrintMessageColored(info_window, f"^3Type:^7 {gametype_display} ^3Map:^7 {mapname_display} ^3Players:^7 {curclients_display}/{maxclients_display} ^3Limits (T/F/R/C):^7 {timelimit_display}/{fraglimit_display}/{roundlimit_display}/{caplimit_display}\n", 0) - - # Line 4: Teams, if any - if current_gametype in team_modes: - PrintMessageColored(info_window, f"^1(RED) ^4(BLUE)\n", 0) - else: - PrintMessageColored(info_window, f"^3(FREE)\n", 0) - - # Lines 5-10: Reserved for future use (empty for now) - info_window.addstr("\n\n\n\n\n\n") - - # Line 11: Separator - separator = "^6" + "═" * (max_x - 1) + "^7" - PrintMessageColored(info_window, separator, 0) - - info_window.refresh() - -def ParseGameEvent(message): - """Parse JSON game events and return formatted message""" - global current_gametype, recent_events - - try: - jObject = json.loads(message) - - # Log ALL JSON events to file if --json flag was provided - if all_json_logger.handlers: - all_json_logger.info('JSON Event received:') - all_json_logger.info(json.dumps(jObject, indent=2)) - all_json_logger.info('---') - - if 'TYPE' not in jObject or 'DATA' not in jObject: - logger.debug('JSON missing TYPE or DATA fields') - return None - - event_type = jObject['TYPE'] - data = jObject['DATA'] - - # Create event signature for deduplication (for PLAYER_KILL/PLAYER_DEATH) - event_signature = None - if event_type in ('PLAYER_DEATH', 'PLAYER_KILL'): - # Create signature based on time, killer, victim (same signature for both types) - time_val = data.get('TIME', 0) - killer_name = data.get('KILLER', {}).get('NAME', '') if data.get('KILLER') else '' - victim_name = data.get('VICTIM', {}).get('NAME', '') - event_signature = f"KILL:{time_val}:{killer_name}:{victim_name}" # Same signature for both - - # Check if we've seen this event recently - if event_signature in recent_events: - logger.debug(f'Duplicate event detected: {event_signature}') - return None - - # Add to recent events and trim list - recent_events.append(event_signature) - if len(recent_events) > MAX_RECENT_EVENTS: - recent_events.pop(0) - - warmup = data.get('WARMUP', False) - warmup_suffix = " ^3(warmup)" if warmup else "" - - if event_type == 'PLAYER_SWITCHTEAM': - if 'KILLER' not in data: - logger.debug('PLAYER_SWITCHTEAM missing KILLER field') - return None - killer = data['KILLER'] - name = killer.get('NAME', 'Unknown') - team = killer.get('TEAM', '') - old_team = killer.get('OLD_TEAM', '') - - # Update player team tracking - update_player_team(name, team) - - if team == old_team: - return None - - team_msg = { - 'FREE': ' ^7joined the ^2Fight^7', - 'SPECTATOR': ' ^7joined the ^3Spectators^7', - 'RED': ' ^7joined the ^1Red ^7team', - 'BLUE': ' ^7joined the ^4Blue ^7team' - }.get(team, ' ^7joined team %s^7' % team) - - old_team_msg = { - 'FREE': 'the ^2Fight^7', - 'SPECTATOR': 'the ^3Spectators^7', - 'RED': '^7the ^1Red ^7team', - 'BLUE': '^7the ^4Blue ^7team' - }.get(old_team, 'team %s' % old_team) - - team_prefix = get_team_color(name) - return "%s%s%s from %s%s\n" % (team_prefix, name, team_msg, old_team_msg, warmup_suffix) - - elif event_type == 'PLAYER_DEATH' or event_type == 'PLAYER_KILL': - if 'VICTIM' not in data: - logger.debug('PLAYER_DEATH/PLAYER_KILL missing VICTIM field') - return None - victim = data['VICTIM'] - victim_name = victim.get('NAME', 'Unknown') - - # Update victim team from event data - if 'TEAM' in victim: - update_player_team(victim_name, victim['TEAM']) - # ADD THIS: Update server_info players list - global server_info - if not any(p['name'] == victim_name for p in server_info['players']): - server_info['players'].append({ - 'name': victim_name, - 'score': '0', - 'ping': '0' - }) - - victim_team_prefix = get_team_color(victim_name) - - if 'KILLER' not in data or not data['KILLER']: - mod = data.get('MOD', 'UNKNOWN') - death_msgs = { - 'FALLING': "%s%s ^7cratered.", - 'HURT': "%s%s ^7was in the wrong place.", - 'LAVA': "%s%s ^7does a backflip into the lava.", - 'WATER': "%s%s ^7sank like a rock.", - 'SLIME': "%s%s ^7melted.", - 'CRUSH': "%s%s ^7was crushed." - } - msg_template = death_msgs.get(mod, "%s%s ^1DIED FROM %s^7") - if mod in death_msgs: - msg = msg_template % (victim_team_prefix, victim_name) - else: - msg = msg_template % (victim_team_prefix, victim_name, mod) - return "%s%s\n" % (msg, warmup_suffix) - - killer = data['KILLER'] - killer_name = killer.get('NAME', 'Unknown') - - # Update killer team from event data - if 'TEAM' in killer: - update_player_team(killer_name, killer['TEAM']) - if not any(p['name'] == killer_name for p in server_info['players']): - server_info['players'].append({ - 'name': killer_name, - 'score': '0', - 'ping': '0' - }) - - killer_team_prefix = get_team_color(killer_name) - - if killer_name != victim_name: - weapon = killer.get('WEAPON', 'UNKNOWN') - weapon_names = { - 'ROCKET': 'the Rocket Launcher', - 'LIGHTNING': 'the Lightning Gun', - 'RAILGUN': 'the Railgun', - 'SHOTGUN': 'the Shotgun', - 'GAUNTLET': 'the Gauntlet', - 'GRENADE': 'the Grenade Launcher', - 'PLASMA': 'the Plasma Gun', - 'MACHINEGUN': 'the Machine Gun' - } - weapon_name = weapon_names.get(weapon, 'the %s' % weapon) - - return "%s%s ^7fragged^7 %s%s ^7with %s%s\n" % ( - killer_team_prefix, killer_name, - victim_team_prefix, victim_name, - weapon_name, warmup_suffix - ) - - else: - # Suicide - weapon = killer.get('WEAPON', 'OTHER_WEAPON') - if weapon != 'OTHER_WEAPON': - weapon_names = { - 'ROCKET': 'Rocket Launcher', - 'PLASMA': 'Plasma Gun', - 'GRENADE': 'Grenade Launcher' - } - weapon_name = weapon_names.get(weapon, weapon) - return "%s%s ^7committed suicide with the ^7%s%s\n" % ( - killer_team_prefix, killer_name, weapon_name, warmup_suffix - ) - - elif event_type == 'PLAYER_MEDAL': - name = data.get('NAME', 'Unknown') - medal = data.get('MEDAL', 'UNKNOWN') - warmup = data.get('WARMUP', False) - warmup_suffix = " ^3(warmup)^7" if warmup else "" - team_prefix = get_team_color(name) - return "%s%s ^7got a medal: ^6%s%s\n" % (team_prefix, name, medal, warmup_suffix) - - elif event_type == 'MATCH_STARTED': - if current_gametype in team_modes: - #redteam = - #bluteam = - return None - elif current_gametype not in team_modes: - players = [] - players_data = data.get('PLAYERS', []) - for player in players_data: - name = player.get('NAME', 'Unknown') - players.append(name) - formatted_players = " vs. ".join(players) - return f"Match has started - {formatted_players}\n" - else: - return None - - elif event_type == 'MATCH_REPORT': - if current_gametype in team_modes: - redscore = int(data.get('TSCORE0', '0')) - bluscore = int(data.get('TSCORE1', '0')) - if redscore > bluscore: - return "^1RED TEAM ^7WINS by a score of %d to %d\n" % (redscore, bluscore) - elif bluscore > redscore: - return "^4BLUE TEAM ^7WINS by a score of %d to %d\n" % (bluscore, redscore) - else: - return "^7The match is a TIE with a score of %d to %d\n" % (redscore, bluscore) - else: - return None - - elif event_type == 'PLAYER_STATS': - name = data.get('NAME', 'Unknown') - team_prefix = get_team_color(name) - kills = int(data.get('KILLS', '0')) - deaths = int(data.get('DEATHS', '0')) - weapon_data = data.get('WEAPONS', {}) - accuracies = calculate_weapon_accuracies(weapon_data) - if accuracies: - best_weapon = max(accuracies, key=accuracies.get) - best_accuracy = accuracies[best_weapon] * 100 - weapon_stats = weapon_data.get(best_weapon, {}) - best_weapon_kills = int(weapon_stats.get('K', 0)) - best_weapon_stats = f"{best_weapon}: {best_accuracy:.2f}% (Kills: {best_weapon_kills})" - else: - best_weapon_stats = "No weapon stats available." - weapon_names = { - 'ROCKET': 'Rocket Launcher', - 'LIGHTNING': 'Lightning Gun', - 'RAILGUN': 'Railgun', - 'SHOTGUN': 'Shotgun', - 'GAUNTLET': 'Gauntlet', - 'GRENADE': 'Grenade Launcher', - 'PLASMA': 'Plasma Gun', - 'MACHINEGUN': 'Machine Gun' - } - weapon_name = weapon_names.get(best_weapon) - return "^7%s%s K/D: %d/%d | Best Weapon: %s - Acc: %.2f%% - Kills: %d\n" % (team_prefix, name, kills, deaths, weapon_name, best_accuracy, best_weapon_kills) - - # Placeholders - elif event_type == 'PLAYER_CONNECT': - return None - elif event_type == 'PLAYER_DISCONNECT': - return None - elif event_type == 'ROUND_OVER': - return None - - else: - # Unknown event type - log at debug level and to file - logger.debug('Unknown event type: %s' % event_type) - unknown_json_logger.info('Unknown event type: %s' % event_type) - unknown_json_logger.info('Full JSON: %s' % json.dumps(jObject, indent=2)) - return None - - except json.JSONDecodeError as e: - logger.debug('JSON decode error: %s' % e) - return None - except (KeyError, TypeError) as e: - logger.debug('Error parsing game event: %s' % e) - return None - - return None - -def InitWindows(screen, args): - logger.handlers = [] - curses.endwin() - - curses.initscr() - screen.nodelay(1) - curses.start_color() - curses.use_default_colors() - curses.cbreak() - curses.setsyx(-1, -1) - screen.addstr("Quake Live PyCon: %s" % args.host) - screen.refresh() - maxy, maxx = screen.getmaxyx() - - for i in range(1,7): - curses.init_pair(i, i, 0) - - curses.init_pair(5, 6, 0) - curses.init_pair(6, 5, 0) - - # Server info window at top - begin_x = 2; width = maxx - 4 - begin_y = 2; height = 10 - info_window = curses.newwin(height, width, begin_y, begin_x) - screen.refresh() - info_window.scrollok(False) - info_window.idlok(False) - info_window.leaveok(True) - info_window.refresh() - - # Output window (main chat/events) - begin_x = 2; width = maxx - 4 - begin_y = 12; height = maxy - 15 - output_window = curses.newwin(height, width, begin_y, begin_x) - screen.refresh() - output_window.scrollok(True) - output_window.idlok(True) - output_window.leaveok(True) - output_window.refresh() - - # Input window - begin_x = 4; width = maxx - 6 - begin_y = maxy - 2; height = 1 - input_window = curses.newwin(height, width, begin_y, begin_x) - screen.addstr(begin_y, begin_x - 2, '$ ') - screen.refresh() - input_window.idlok(True) - input_window.leaveok(False) - input_window.refresh() - - # Divider window - begin_x = 2; width = maxx - 4 - begin_y = maxy - 3; height = 1 - divider_window = curses.newwin(height, width, begin_y, begin_x) - screen.refresh() - divider_window.hline(curses.ACS_HLINE, width) - divider_window.refresh() - - mh = CursesHandler(output_window) - formatterDisplay = logging.Formatter('%(asctime)-8s|%(name)-12s|%(levelname)-6s|%(message)-s', '%H:%M:%S') - mh.setFormatter(formatterDisplay) - logger.addHandler(mh) - - screen.refresh() - - return input_window, output_window, info_window - -def main(screen): - global current_gametype - - parser = argparse.ArgumentParser( description = 'Verbose QuakeLive server statistics' ) - parser.add_argument( '--host', default = HOST, help = 'ZMQ URI to connect to. Defaults to %s' % HOST ) - parser.add_argument( '--password', required = False ) - parser.add_argument( '--identity', default = uuid.uuid1().hex, help = 'Specify the socket identity. Random UUID used by default' ) - parser.add_argument( '-v', '--verbose', action='count', default=0, help = 'Increase verbosity (use -v for INFO, -vv for DEBUG)' ) - parser.add_argument( '--unknown-log', default='unknown_events.log', help = 'File to log unknown JSON events. Defaults to unknown_events.log' ) - parser.add_argument('--json', '-j', dest='json_log', default=None, help='File to log all JSON events. If specified, all JSON messages from server will be captured') - args = parser.parse_args() - - # Set logging level based on verbosity - # Default (0): WARNING - only startup, connections, and game data - # -v (1): INFO - all communications and acknowledgements - # -vv (2): DEBUG - detailed debug information including unparsed JSON - if args.verbose == 0: - logger.setLevel(logging.WARNING) - elif args.verbose == 1: - logger.setLevel(logging.INFO) - else: - logger.setLevel(logging.DEBUG) - - # Set up file handler for unknown JSON events - file_handler = logging.FileHandler(args.unknown_log, mode='a') - file_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S') - file_handler.setFormatter(file_formatter) - unknown_json_logger.addHandler(file_handler) - unknown_json_logger.propagate = False # Don't send to parent logger - - input_window, output_window, info_window = InitWindows(screen, args) - - UpdateServerInfoWindow(info_window) - - PrintMessageFormatted(output_window, "*** QL pyCon Version 0.7.0 starting ***\n") - PrintMessageFormatted(output_window, "zmq python bindings {}, libzmq version {}\n".format(repr(zmq.__version__), zmq.zmq_version())) - - stats_port = None - stats_password = None - stats_context = None - stats_socket = None - stats_connected = False - stats_check_counter = 0 - - q = setupInputQueue(input_window) - try: - logger.info('Initializing ZMQ context...') - ctx = zmq.Context() - logger.info('Creating DEALER socket...') - socket = ctx.socket( zmq.DEALER ) - logger.info('Setting up socket monitor...') - monitor = socket.get_monitor_socket( zmq.EVENT_ALL ) - if ( args.password is not None ): - logger.info( 'Setting password for access' ) - socket.plain_username = b'rcon' - socket.plain_password = args.password.encode('utf-8') - socket.zap_domain = b'rcon' - logger.info( 'Setting socket identity: %s' % args.identity ) - socket.setsockopt( zmq.IDENTITY, args.identity.encode('utf-8') ) - socket.connect( args.host ) - logger.info( 'Connection initiated, waiting for events...' ) - while ( True ): - event = socket.poll( POLL_TIMEOUT ) - event_monitor = _checkMonitor( monitor ) - if ( event_monitor is not None and event_monitor[0] == zmq.EVENT_CONNECTED ): - PrintMessageFormatted(output_window, "Connected to server\n") - socket.send( b'register' ) - logger.info( 'Registration message sent.' ) - - PrintMessageFormatted(output_window, "Requesting connection info...\n") - socket.send( b'zmq_stats_password' ) - socket.send( b'net_port' ) - - while ( not q.empty() ): - l = q.get() - logger.info( 'Sending command: %s' % repr( l.strip() ) ) - - # Display the command being sent with timestamp (in cyan to differentiate) - timestamp = time.strftime('%H:%M:%S') - PrintMessageFormatted(output_window, f"^5[^7{timestamp}^5] >>> {l.strip()}^7\n", add_timestamp=False) - - socket.send( l.encode('utf-8') ) - - if stats_connected and stats_socket: - stats_check_counter += 1 - - if stats_check_counter % 100 == 0: - logger.debug('Stats polling active (check #%d)' % (stats_check_counter // 100)) - - try: - stats_msg = stats_socket.recv(zmq.NOBLOCK) - except zmq.error.Again: - stats_msg = None - if stats_msg: - logger.info('Stats event received (%d bytes)' % len(stats_msg)) - y,x = curses.getsyx() - stats_str = stats_msg.decode('utf-8', errors='replace') - logger.debug('Stats JSON: %s' % repr(stats_str[:200])) - - parsed_event = ParseGameEvent(stats_str) - if parsed_event: - logger.debug('Event parsed successfully') - PrintMessageFormatted(output_window, parsed_event) - else: - logger.debug('Event parsing returned None') - - curses.setsyx(y,x) - curses.doupdate() - - if ( event == 0 ): - continue - - logger.debug( 'Socket has data available, reading messages...' ) - msg_count = 0 - while ( True ): - try: - msg = socket.recv( zmq.NOBLOCK ) - msg_count += 1 - except zmq.error.Again: - if msg_count > 0: - logger.debug( 'Read %d message(s) from socket.' % msg_count ) - break - except Exception as e: - timestamp = time.strftime('%H:%M:%S') - PrintMessageFormatted(output_window, f"^1[^7{timestamp}^1] Error: {e}^7\n", add_timestamp=False) - logger.error( 'Error receiving message: %s' % e ) - break - else: - if len( msg ) > 0: - logger.debug( 'Received message (%d bytes): %s' % (len(msg), repr(msg[:100])) ) - y,x = curses.getsyx() - msg_str = msg.decode('utf-8', errors='replace') - - is_background_response = parse_server_response(msg_str, info_window) - if is_background_response: - logger.debug('Suppressing background command response') - continue # Skip printing this message - if 'net_port' in msg_str and ' is:' in msg_str and '"net_port"' in msg_str: - match = re.search(r' is:"([^"]+)"', msg_str) - if match: - port_str = match.group(1) - port_str = port_str.strip() - digit_match = re.search(r'(\d+)', port_str) - if digit_match and stats_port is None: - stats_port = digit_match.group(1) - logger.info('Got stats port: %s' % stats_port) - - if 'zmq_stats_password' in msg_str and ' is:' in msg_str and '"zmq_stats_password"' in msg_str: - match = re.search(r' is:"([^"]+)"', msg_str) - if match and stats_password is None: - password_str = match.group(1) - password_str = re.sub(r'\^\d', '', password_str) - stats_password = password_str.strip() - logger.info('Got stats password: %s' % stats_password) - - if stats_port and stats_password and not stats_connected: - try: - PrintMessageFormatted(output_window, "Connecting to stats stream...\n") - logger.info(' Host: %s:%s' % (args.host.split('//')[1].split(':')[0], stats_port)) - logger.info(' Password: %s' % stats_password) - - host_ip = args.host.split('//')[1].split(':')[0] - stats_host = 'tcp://%s:%s' % (host_ip, stats_port) - - time.sleep(0.5) - - stats_context = zmq.Context() - stats_socket = stats_context.socket(zmq.SUB) - logger.debug('Stats socket created (SUB type)') - - if stats_password and stats_password.strip(): - logger.debug('Setting PLAIN authentication') - stats_socket.setsockopt(zmq.PLAIN_USERNAME, b'stats') - stats_socket.setsockopt(zmq.PLAIN_PASSWORD, stats_password.encode('utf-8')) - stats_socket.setsockopt_string(zmq.ZAP_DOMAIN, 'stats') - logger.debug('Auth configured') - - logger.debug('Connecting to %s' % stats_host) - stats_socket.connect(stats_host) - logger.debug('Connected') - - logger.debug('Setting ZMQ_SUBSCRIBE to empty (all messages)') - stats_socket.setsockopt(zmq.SUBSCRIBE, b'') - logger.debug('Subscribed') - - time.sleep(0.5) - - stats_connected = True - PrintMessageFormatted(output_window, "Stats stream connected - ready for game events\n") - - # Send initial server info queries (once only) - logger.info('Sending initial server info queries') - socket.send(b'qlx_serverBrandName') - socket.send(b'g_factoryTitle') - socket.send(b'mapname') - socket.send(b'timelimit') - socket.send(b'fraglimit') - socket.send(b'roundlimit') - socket.send(b'capturelimit') - socket.send(b'sv_maxclients') - - # Set up file handler for all JSON events if --json flag is provided - if args.json_log: - json_file_handler = logging.FileHandler(args.json_log, mode='a') - json_file_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S') - json_file_handler.setFormatter(json_file_formatter) - all_json_logger.addHandler(json_file_handler) - all_json_logger.propagate = False - PrintMessageFormatted(output_window, f"*** JSON capture enabled: {args.json_log} ***\n") - except Exception as e: - timestamp = time.strftime('%H:%M:%S') - PrintMessageFormatted(output_window, f"^1[^7{timestamp}^1] Error: Stats connection failed: {e}^7\n", add_timestamp=False) - logger.error('Stats connection failed: %s' % e) - import traceback - logger.debug(traceback.format_exc()) - - # Try to parse as JSON game event - parsed_event = ParseGameEvent(msg_str) - if parsed_event: - # Successfully parsed game event - PrintMessageFormatted(output_window, parsed_event) - else: - # Check if it looks like JSON (starts with { or [) - stripped = msg_str.strip() - if stripped and stripped[0] in ('{', '['): - # It's JSON but we didn't parse it - already logged to file - logger.debug('Unparsed JSON event') - else: - # Check for powerup messages - formatted_powerup = format_powerup_message(msg_str) - if formatted_powerup: - PrintMessageFormatted(output_window, formatted_powerup) - curses.setsyx(y,x) - curses.doupdate() - continue - # Not JSON - check if it's a bot debug message (filter unless verbose) - is_bot_debug = ' entered ' in msg_str and (' seek ' in msg_str or ' battle ' in msg_str or ' chase' in msg_str or ' fight' in msg_str) - - if is_bot_debug and args.verbose == 0: - # Skip bot debug messages in default mode - logger.debug('Filtered bot debug message: %s' % msg_str[:50]) - continue - - # Check if it's a chat message - if ':' in msg_str and not msg_str.startswith(('print', 'broadcast', 'zmq')): - # Strip the special character first to get clean player name - clean_msg = msg_str.replace(chr(25), '') - - # Check if it's team chat (PlayerName): or regular chat PlayerName: - if clean_msg.strip().startswith('(') and ')' in clean_msg: - # Team chat: (PlayerName) (Location): message or (PlayerName): message - # Location can contain nested parens like (Lower Floor (Near Yellow Armour)) - # We need to match the first (PlayerName), then optionally a second parenthetical with potential nesting - - # First, extract player name (first parenthetical) - player_match = re.match(r'^(\([^)]+\))', clean_msg) - if player_match: - player_part = player_match.group(1) - rest_of_msg = clean_msg[len(player_part):].lstrip() - - # Check if there's a location (starts with another opening paren) - if rest_of_msg.startswith('('): - # Find the matching closing paren for the location - # We need to count parens to handle nesting like (Floor (Near Armor)) - paren_count = 0 - location_end = -1 - for i, char in enumerate(rest_of_msg): - if char == '(': - paren_count += 1 - elif char == ')': - paren_count -= 1 - if paren_count == 0: - location_end = i + 1 - break - - if location_end > 0 and location_end < len(rest_of_msg) and rest_of_msg[location_end] == ':': - # We found a valid location with closing paren followed by colon - location_part = rest_of_msg[:location_end] - message_part = rest_of_msg[location_end + 1:] # After the colon - - # Extract player name for team lookup - name_match = re.match(r'\(([^)]+)\)', player_part) - if name_match: - player_name = name_match.group(1).strip() - player_name_clean = re.sub(r'\^\d', '', player_name) - - team_prefix = '' - if player_name_clean in player_teams: - team_prefix = get_team_color(player_name_clean) - - # Strip color codes from location for consistent yellow coloring - location_clean = re.sub(r'\^\d', '', location_part) - msg_str = f"{team_prefix}{player_part} ^3{location_clean}^7:^5{message_part}" - else: - # No valid location, treat as regular team chat - if rest_of_msg.startswith(':'): - message_part = rest_of_msg[1:] - name_match = re.match(r'\(([^)]+)\)', player_part) - if name_match: - player_name = name_match.group(1).strip() - player_name_clean = re.sub(r'\^\d', '', player_name) - - team_prefix = '' - if player_name_clean in player_teams: - team_prefix = get_team_color(player_name_clean) - - msg_str = f"{team_prefix}{player_part}^5:{message_part}" - else: - # No location, check if it's just (PlayerName): message - colon_match = re.match(r'^(\([^)]+\)):(\s*.*)', clean_msg) - if colon_match: - player_part = colon_match.group(1) + ':' - message_part = colon_match.group(2) - - name_match = re.match(r'\(([^)]+)\)', player_part) - if name_match: - player_name = name_match.group(1).strip() - player_name_clean = re.sub(r'\^\d', '', player_name) - - team_prefix = '' - if player_name_clean in player_teams: - team_prefix = get_team_color(player_name_clean) - - msg_str = f"{team_prefix}{player_part}^5{message_part}\n" - else: - # Regular chat: PlayerName: message - parts = clean_msg.split(':', 1) - if len(parts) == 2: - player_name = parts[0].strip() - message_text = parts[1] # Keep the leading space - - # Strip color codes from chat player name - player_name_clean = re.sub(r'\^\d', '', player_name) - - # Look up player by clean name and add team prefix - team_prefix = '' - if player_name_clean in player_teams: - team_prefix = get_team_color(player_name_clean) - - # Reconstruct with team prefix and colored message (^2 for regular chat) - # Need to preserve the original player_name with color codes from msg_str - original_parts = msg_str.replace(chr(25), '').split(':', 1) - if len(original_parts) == 2: - msg_str = f"{team_prefix}{original_parts[0]}:^2{original_parts[1]}" - - # Print the message (chat or other) - PrintMessageFormatted(output_window, msg_str) - - curses.setsyx(y,x) - curses.doupdate() - else: - logger.debug( 'Received empty message (possible keepalive or protocol frame)' ) - - except KeyboardInterrupt: - PrintMessageFormatted(output_window, "\nShutting down...\n") - except Exception as e: - timestamp = time.strftime('%H:%M:%S') - PrintMessageFormatted(output_window, f"^1[^7{timestamp}^1] Fatal Error: {e}^7\n", add_timestamp=False) - logger.error( 'Fatal error: %s' % e ) - import traceback - logger.error( traceback.format_exc() ) - finally: - if stats_socket: - stats_socket.close() - if stats_context: - stats_context.term() - -if ( __name__ == '__main__' ): - curses.wrapper(main) - -# Version: 0.7.0