#!/usr/bin/env python3 # Version: 1.14.0 import sys import re import time import struct import argparse import uuid import threading import queue import json import logging logger = logging.getLogger('logger') logger.setLevel(logging.DEBUG) # Separate logger for unknown JSON events unknown_json_logger = logging.getLogger('unknown_json') unknown_json_logger.setLevel(logging.DEBUG) import zmq import curses import curses.textpad import signal import unittest import locale locale.setlocale(locale.LC_ALL, '') current_gametype = None # Track recent events to avoid duplicates recent_events = [] MAX_RECENT_EVENTS = 10 team_modes = ["TDM", "CA", "CTF", "ONEFLAG", "OVERLOAD", "HARVESTER", "FT" ] player_teams = {} def update_player_team(name, team): """Update player team tracking. Team can be int (1=RED, 2=BLUE, 3=SPECTATOR) or string""" if current_gametype in team_modes: # Convert numeric team to string if isinstance(team, int): team_map = {1: 'RED', 2: 'BLUE', 3: 'SPECTATOR', 0: 'FREE'} team = team_map.get(team, 'FREE') if team not in ['RED', 'BLUE', 'FREE', 'SPECTATOR']: team = 'FREE' # Store both the original name and the color-stripped version player_teams[name] = team clean_name = re.sub(r'\^\d', '', name) if clean_name != name: player_teams[clean_name] = team logger.debug(f'Updated team for {name} (clean: {clean_name}): {team}') def _readSocketEvent( msg ): event_id = struct.unpack( ' 0 else 0 accuracies[weapon] = accuracy return accuracies def _checkMonitor( monitor ): try: event_monitor = monitor.recv( zmq.NOBLOCK ) except zmq.Again: return ( event_id, event_name, event_value ) = _readSocketEvent( event_monitor ) event_monitor_endpoint = monitor.recv( zmq.NOBLOCK ) logger.debug( 'monitor: %s %d endpoint %s' % ( event_name, event_value, event_monitor_endpoint ) ) return ( event_id, event_value ) class CursesHandler(logging.Handler): def __init__(self, screen): logging.Handler.__init__(self) self.screen = screen def emit(self, record): try: msg = self.format(record) screen = self.screen fs = "%s\n" try: PrintMessageFormatted(screen, fs % msg) screen.refresh() except UnicodeError: PrintMessageFormatted(screen, fs % msg.encode("UTF-8")) screen.refresh() except (KeyboardInterrupt, SystemExit): raise except: self.handleError(record) def setupInputQueue(window): def waitStdin( q ): while ( True ): l = curses.textpad.Textbox(window).edit() if len( l ) > 0 : q.put( l ) window.clear() window.refresh() q = queue.Queue() t = threading.Thread( target = waitStdin, args = ( q, ) ) t.daemon = True t.start() return q HOST = 'tcp://127.0.0.1:27961' POLL_TIMEOUT = 100 def PrintMessageColored(window, message, attributes): if not curses.has_colors: window.addstr(message) return color = 0 parse_color = False for ch in message: val = ord( ch ) if parse_color: if val >= ord('0') and val <= ord('7'): color = val - ord('0') if color == 7: color = 0 else: window.addch('^', curses.color_pair(color) | attributes) window.addch(ch, curses.color_pair(color) | attributes) parse_color = False elif ch == '^': parse_color = True else: window.addch(ch, curses.color_pair(color) | attributes) window.refresh() def PrintMessageFormatted(window, message, add_timestamp=True): attributes = 0 message = message.replace("\\n", "") message = message.replace(chr(25), "") if message[:10] == "broadcast:": message = message[11:] attributes = curses.A_BOLD if message[:7] == "print \"": message = message[7:-2] + "\n" # Don't add timestamp to server responses, especially status command # But DO add timestamps to "zmq RCON command" lines (server acknowledgments) skip_timestamp_keywords = [ 'map:', 'num score', '---', 'bot' ] # Special handling: don't skip "zmq RCON" lines - they should get timestamps # But skip everything else from status output is_status_keyword = any(keyword in message for keyword in skip_timestamp_keywords) # Also don't timestamp "status" when it appears in isolation (status table context) # But DO timestamp it when it's part of "zmq RCON command...status" (the echo line) if 'status' in message and 'zmq RCON' not in message: is_status_keyword = True # Skip very short messages or messages with leading spaces (status fragments) is_likely_fragment = ( len(message.strip()) <= 2 or # Very short messages (message.startswith(' ') and len(message.strip()) < 50) or # Messages with leading space message.strip().isdigit() # Pure numbers (scores, pings, ports, etc) ) # Skip single words that are less than 20 chars (likely player names or status fragments) is_short_word = len(message.strip()) < 20 and ' ' not in message.strip() # Skip IP addresses (pattern: xxx.xxx.xxx.xxx or xxx.xxx.xxx.xxx:port) is_ip_address = False stripped = message.strip() allowed_chars = set('0123456789.:') if stripped.count('.') == 3 and all(c in allowed_chars for c in stripped): is_ip_address = True should_skip_timestamp = ( message.startswith('***') or is_likely_fragment or is_short_word or is_ip_address or is_status_keyword ) # ADD THIS DEBUG BLOCK HERE: if 'K/D:' in message or 'WINS' in message or 'TEAM' in message: logger.debug(f'=== TIMESTAMP DEBUG ===') logger.debug(f'Message: {message[:100]}') logger.debug(f'is_status_keyword: {is_status_keyword}') logger.debug(f'is_likely_fragment: {is_likely_fragment}') logger.debug(f'is_short_word: {is_short_word}') logger.debug(f'is_ip_address: {is_ip_address}') logger.debug(f'should_skip_timestamp: {should_skip_timestamp}') logger.debug(f'add_timestamp: {add_timestamp}') # Add timestamp if requested and not a special message type if add_timestamp and not should_skip_timestamp: timestamp = time.strftime('%H:%M:%S') message = f"^3[^7{timestamp}^3]^7 {message}" PrintMessageColored(window, message, attributes) def get_team_color(player_name): """Get the color-coded team prefix for a player""" if current_gametype not in team_modes: return '' team = player_teams.get(player_name, None) if not team: return '' return { 'RED': '^1(RED)^7', 'BLUE': '^4(BLUE)^7', 'FREE': '', 'SPECTATOR': '^3(SPEC)^7' }.get(team, '') def ParseGameEvent(message): """Parse JSON game events and return formatted message""" global current_gametype, recent_events try: jObject = json.loads(message) if 'TYPE' not in jObject or 'DATA' not in jObject: logger.debug('JSON missing TYPE or DATA fields') return None event_type = jObject['TYPE'] data = jObject['DATA'] # Create event signature for deduplication (for PLAYER_KILL/PLAYER_DEATH) event_signature = None if event_type in ('PLAYER_DEATH', 'PLAYER_KILL'): # Create signature based on time, killer, victim (same signature for both types) time_val = data.get('TIME', 0) killer_name = data.get('KILLER', {}).get('NAME', '') if data.get('KILLER') else '' victim_name = data.get('VICTIM', {}).get('NAME', '') event_signature = f"KILL:{time_val}:{killer_name}:{victim_name}" # Same signature for both # Check if we've seen this event recently if event_signature in recent_events: logger.debug(f'Duplicate event detected: {event_signature}') return None # Add to recent events and trim list recent_events.append(event_signature) if len(recent_events) > MAX_RECENT_EVENTS: recent_events.pop(0) warmup = data.get('WARMUP', False) warmup_suffix = " ^7(^3warmup^7)" if warmup else "" if event_type == 'PLAYER_SWITCHTEAM': if 'KILLER' not in data: logger.debug('PLAYER_SWITCHTEAM missing KILLER field') return None killer = data['KILLER'] name = killer.get('NAME', 'Unknown') team = killer.get('TEAM', '') old_team = killer.get('OLD_TEAM', '') # Update player team tracking update_player_team(name, team) if team == old_team: return None team_msg = { 'FREE': ' ^7joined the ^2Fight^7', 'SPECTATOR': ' ^7joined the ^3Spectators^7', 'RED': ' ^7joined the ^1Red ^7team', 'BLUE': ' ^7joined the ^4Blue ^7team' }.get(team, ' ^7joined team %s^7' % team) old_team_msg = { 'FREE': 'the ^2Fight^7', 'SPECTATOR': 'the ^3Spectators^7', 'RED': '^7the ^1Red ^7team', 'BLUE': '^7the ^4Blue ^7team' }.get(old_team, 'team %s' % old_team) team_prefix = get_team_color(name) return "%s%s%s from %s%s\n" % (team_prefix, name, team_msg, old_team_msg, warmup_suffix) elif event_type == 'PLAYER_DEATH' or event_type == 'PLAYER_KILL': if 'VICTIM' not in data: logger.debug('PLAYER_DEATH/PLAYER_KILL missing VICTIM field') return None victim = data['VICTIM'] victim_name = victim.get('NAME', 'Unknown') # Update victim team from event data if 'TEAM' in victim: update_player_team(victim_name, victim['TEAM']) victim_team_prefix = get_team_color(victim_name) if 'KILLER' not in data or not data['KILLER']: mod = data.get('MOD', 'UNKNOWN') death_msgs = { 'FALLING': "%s%s ^7cratered.", 'HURT': "%s%s ^7was in the wrong place.", 'LAVA': "%s%s ^7does a backflip into the lava.", 'WATER': "%s%s ^7sank like a rock.", 'SLIME': "%s%s ^7melted.", 'CRUSH': "%s%s ^7was crushed." } msg_template = death_msgs.get(mod, "%s%s ^1DIED FROM %s^7") if mod in death_msgs: msg = msg_template % (victim_team_prefix, victim_name) else: msg = msg_template % (victim_team_prefix, victim_name, mod) return "%s%s\n" % (msg, warmup_suffix) killer = data['KILLER'] killer_name = killer.get('NAME', 'Unknown') # Update killer team from event data if 'TEAM' in killer: update_player_team(killer_name, killer['TEAM']) killer_team_prefix = get_team_color(killer_name) if killer_name != victim_name: weapon = killer.get('WEAPON', 'UNKNOWN') weapon_names = { 'ROCKET': 'the Rocket Launcher', 'LIGHTNING': 'the Lightning Gun', 'RAILGUN': 'the Railgun', 'SHOTGUN': 'the Shotgun', 'GAUNTLET': 'the Gauntlet', 'GRENADE': 'the Grenade Launcher', 'PLASMA': 'the Plasma Gun', 'MACHINEGUN': 'the Machine Gun' } weapon_name = weapon_names.get(weapon, 'the %s' % weapon) return "%s%s ^8fragged^7 %s%s ^7with %s%s\n" % ( killer_team_prefix, killer_name, victim_team_prefix, victim_name, weapon_name, warmup_suffix ) else: # Suicide weapon = killer.get('WEAPON', 'OTHER_WEAPON') if weapon != 'OTHER_WEAPON': weapon_names = { 'ROCKET': 'Rocket Launcher', 'PLASMA': 'Plasma Gun', 'GRENADE': 'Grenade Launcher' } weapon_name = weapon_names.get(weapon, weapon) return "%s%s ^7committed suicide with the ^7%s%s\n" % ( killer_team_prefix, killer_name, weapon_name, warmup_suffix ) elif event_type == 'PLAYER_MEDAL': name = data.get('NAME', 'Unknown') medal = data.get('MEDAL', 'UNKNOWN') warmup = data.get('WARMUP', False) warmup_suffix = " ^3(warmup)^7" if warmup else "" team_prefix = get_team_color(name) return "%s%s ^7got a medal: ^6%s%s\n" % (team_prefix, name, medal, warmup_suffix) elif event_type == 'MATCH_REPORT': if current_gametype in team_modes: redscore = int(data.get('TSCORE0', '0')) bluscore = int(data.get('TSCORE1', '0')) if redscore > bluscore: return "^1RED TEAM ^7WINS by a score of %d to %d\n" % (redscore, bluscore) elif bluscore > redscore: return "^4BLUE TEAM ^7WINS by a score of %d to %d\n" % (bluscore, redscore) else: return "^7The match is a TIE with a score of %d to %d\n" % (redscore, bluscore) else: return None elif event_type == 'PLAYER_STATS': name = data.get('NAME', 'Unknown') team_prefix = get_team_color(name) kills = int(data.get('KILLS', '0')) deaths = int(data.get('DEATHS', '0')) weapon_data = data.get('WEAPONS', {}) accuracies = calculate_weapon_accuracies(weapon_data) if accuracies: best_weapon = max(accuracies, key=accuracies.get) best_accuracy = accuracies[best_weapon] * 100 weapon_stats = weapon_data.get(best_weapon, {}) best_weapon_kills = int(weapon_stats.get('K', 0)) best_weapon_stats = f"{best_weapon}: {best_accuracy:.2f}% (Kills: {best_weapon_kills})" else: best_weapon_stats = "No weapon stats available." weapon_names = { 'ROCKET': 'Rocket Launcher', 'LIGHTNING': 'Lightning Gun', 'RAILGUN': 'Railgun', 'SHOTGUN': 'Shotgun', 'GAUNTLET': 'Gauntlet', 'GRENADE': 'Grenade Launcher', 'PLASMA': 'Plasma Gun', 'MACHINEGUN': 'Machine Gun' } weapon_name = weapon_names.get(best_weapon) return "^7%s%s K/D: %d/%d | Best Weapon: %s - Acc: %.2f%% - Kills: %d\n" % (team_prefix, name, kills, deaths, weapon_name, best_accuracy, best_weapon_kills) #return "^7%s%s K/D: %d/%d\n" % (team_prefix, name, kills, deaths) else: # Unknown event type - log at debug level and to file logger.debug('Unknown event type: %s' % event_type) unknown_json_logger.info('Unknown event type: %s' % event_type) unknown_json_logger.info('Full JSON: %s' % json.dumps(jObject, indent=2)) return None except json.JSONDecodeError as e: logger.debug('JSON decode error: %s' % e) return None except (KeyError, TypeError) as e: logger.debug('Error parsing game event: %s' % e) return None return None def InitWindows(screen, args): logger.handlers = [] curses.endwin() curses.initscr() screen.nodelay(1) curses.start_color() curses.cbreak() curses.setsyx(-1, -1) screen.addstr("Quake Live rcon: %s" % args.host) screen.refresh() maxy, maxx = screen.getmaxyx() for i in range(1,7): curses.init_pair(i, i, 0) curses.init_pair(5, 6, 0) curses.init_pair(6, 5, 0) begin_x = 2; width = maxx - 4 begin_y = 2; height = maxy - 5 output_window = curses.newwin(height, width, begin_y, begin_x) screen.refresh() output_window.scrollok(True) output_window.idlok(True) output_window.leaveok(True) output_window.refresh() begin_x = 4; width = maxx - 6 begin_y = maxy - 2; height = 1 input_window = curses.newwin(height, width, begin_y, begin_x) screen.addstr(begin_y, begin_x - 2, '>') screen.refresh() input_window.idlok(True) input_window.leaveok(False) input_window.refresh() begin_x = 2; width = maxx - 4 begin_y = maxy - 3; height = 1 divider_window = curses.newwin(height, width, begin_y, begin_x) screen.refresh() divider_window.hline(curses.ACS_HLINE, width) divider_window.refresh() mh = CursesHandler(output_window) formatterDisplay = logging.Formatter('%(asctime)-8s|%(name)-12s|%(levelname)-6s|%(message)-s', '%H:%M:%S') mh.setFormatter(formatterDisplay) logger.addHandler(mh) screen.refresh() return input_window, output_window def main(screen): global current_gametype parser = argparse.ArgumentParser( description = 'Verbose QuakeLive server statistics' ) parser.add_argument( '--host', default = HOST, help = 'ZMQ URI to connect to. Defaults to %s' % HOST ) parser.add_argument( '--password', required = False ) parser.add_argument( '--identity', default = uuid.uuid1().hex, help = 'Specify the socket identity. Random UUID used by default' ) parser.add_argument( '-v', '--verbose', action='count', default=0, help = 'Increase verbosity (use -v for INFO, -vv for DEBUG)' ) parser.add_argument( '--unknown-log', default='unknown_events.log', help = 'File to log unknown JSON events. Defaults to unknown_events.log' ) args = parser.parse_args() # Set logging level based on verbosity # Default (0): WARNING - only startup, connections, and game data # -v (1): INFO - all communications and acknowledgements # -vv (2): DEBUG - detailed debug information including unparsed JSON if args.verbose == 0: logger.setLevel(logging.WARNING) elif args.verbose == 1: logger.setLevel(logging.INFO) else: logger.setLevel(logging.DEBUG) # Set up file handler for unknown JSON events file_handler = logging.FileHandler(args.unknown_log, mode='a') file_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S') file_handler.setFormatter(file_formatter) unknown_json_logger.addHandler(file_handler) unknown_json_logger.propagate = False # Don't send to parent logger input_window, output_window = InitWindows(screen, args) PrintMessageFormatted(output_window, "*** QL pyCon Version 1.13.0 starting ***\n") PrintMessageFormatted(output_window, "zmq python bindings {}, libzmq version {}\n".format(repr(zmq.__version__), zmq.zmq_version())) stats_port = None stats_password = None stats_context = None stats_socket = None stats_connected = False stats_check_counter = 0 q = setupInputQueue(input_window) try: logger.info('Initializing ZMQ context...') ctx = zmq.Context() logger.info('Creating DEALER socket...') socket = ctx.socket( zmq.DEALER ) logger.info('Setting up socket monitor...') monitor = socket.get_monitor_socket( zmq.EVENT_ALL ) if ( args.password is not None ): logger.info( 'Setting password for access' ) socket.plain_username = b'rcon' socket.plain_password = args.password.encode('utf-8') socket.zap_domain = b'rcon' logger.info( 'Setting socket identity: %s' % args.identity ) socket.setsockopt( zmq.IDENTITY, args.identity.encode('utf-8') ) socket.connect( args.host ) logger.info( 'Connection initiated, waiting for events...' ) while ( True ): event = socket.poll( POLL_TIMEOUT ) event_monitor = _checkMonitor( monitor ) if ( event_monitor is not None and event_monitor[0] == zmq.EVENT_CONNECTED ): PrintMessageFormatted(output_window, "Connected to server\n") socket.send( b'register' ) logger.info( 'Registration message sent.' ) PrintMessageFormatted(output_window, "Requesting game info...\n") socket.send( b'zmq_stats_password' ) socket.send( b'net_port' ) logger.info('Requesting Gametype...') socket.send(b'cvarlist g_gametype') while ( not q.empty() ): l = q.get() logger.info( 'Sending command: %s' % repr( l.strip() ) ) # Display the command being sent with timestamp (in cyan to differentiate) timestamp = time.strftime('%H:%M:%S') PrintMessageFormatted(output_window, f"^5[^7{timestamp}^5] >>> {l.strip()}^7\n", add_timestamp=False) socket.send( l.encode('utf-8') ) if stats_connected and stats_socket: stats_check_counter += 1 if stats_check_counter % 100 == 0: logger.debug('Stats polling active (check #%d)' % (stats_check_counter // 100)) try: stats_msg = stats_socket.recv(zmq.NOBLOCK) except zmq.error.Again: stats_msg = None if stats_msg: logger.info('Stats event received (%d bytes)' % len(stats_msg)) y,x = curses.getsyx() stats_str = stats_msg.decode('utf-8', errors='replace') logger.debug('Stats JSON: %s' % repr(stats_str[:200])) parsed_event = ParseGameEvent(stats_str) if parsed_event: logger.debug('Event parsed successfully') PrintMessageFormatted(output_window, parsed_event) else: logger.debug('Event parsing returned None') curses.setsyx(y,x) curses.doupdate() if ( event == 0 ): continue logger.debug( 'Socket has data available, reading messages...' ) msg_count = 0 while ( True ): try: msg = socket.recv( zmq.NOBLOCK ) msg_count += 1 except zmq.error.Again: if msg_count > 0: logger.debug( 'Read %d message(s) from socket.' % msg_count ) break except Exception as e: timestamp = time.strftime('%H:%M:%S') PrintMessageFormatted(output_window, f"^1[^7{timestamp}^1] Error: {e}^7\n", add_timestamp=False) logger.error( 'Error receiving message: %s' % e ) break else: if len( msg ) > 0: logger.debug( 'Received message (%d bytes): %s' % (len(msg), repr(msg[:100])) ) y,x = curses.getsyx() msg_str = msg.decode('utf-8', errors='replace') if 'net_port' in msg_str and ' is:' in msg_str and '"net_port"' in msg_str: match = re.search(r' is:"([^"]+)"', msg_str) if match: port_str = match.group(1) port_str = port_str.strip() digit_match = re.search(r'(\d+)', port_str) if digit_match and stats_port is None: stats_port = digit_match.group(1) logger.info('Got stats port: %s' % stats_port) if 'zmq_stats_password' in msg_str and ' is:' in msg_str and '"zmq_stats_password"' in msg_str: match = re.search(r' is:"([^"]+)"', msg_str) if match and stats_password is None: password_str = match.group(1) password_str = re.sub(r'\^\d', '', password_str) stats_password = password_str.strip() logger.info('Got stats password: %s' % stats_password) if stats_port and stats_password and not stats_connected: try: PrintMessageFormatted(output_window, "Connecting to stats stream...\n") logger.info(' Host: %s:%s' % (args.host.split('//')[1].split(':')[0], stats_port)) logger.info(' Password: %s' % stats_password) host_ip = args.host.split('//')[1].split(':')[0] stats_host = 'tcp://%s:%s' % (host_ip, stats_port) time.sleep(0.5) stats_context = zmq.Context() stats_socket = stats_context.socket(zmq.SUB) logger.debug('Stats socket created (SUB type)') if stats_password and stats_password.strip(): logger.debug('Setting PLAIN authentication') stats_socket.setsockopt(zmq.PLAIN_USERNAME, b'stats') stats_socket.setsockopt(zmq.PLAIN_PASSWORD, stats_password.encode('utf-8')) stats_socket.setsockopt_string(zmq.ZAP_DOMAIN, 'stats') logger.debug('Auth configured') logger.debug('Connecting to %s' % stats_host) stats_socket.connect(stats_host) logger.debug('Connected') logger.debug('Setting ZMQ_SUBSCRIBE to empty (all messages)') stats_socket.setsockopt(zmq.SUBSCRIBE, b'') logger.debug('Subscribed') time.sleep(0.5) stats_connected = True PrintMessageFormatted(output_window, "Stats stream connected - ready for game events\n") except Exception as e: timestamp = time.strftime('%H:%M:%S') PrintMessageFormatted(output_window, f"^1[^7{timestamp}^1] Error: Stats connection failed: {e}^7\n", add_timestamp=False) logger.error('Stats connection failed: %s' % e) import traceback logger.debug(traceback.format_exc()) if 'g_gametype' in msg_str: match = re.search(r'\bg_gametype\s+"(\d+)"', msg_str) if match: gametypes = { "0": "FFA", "1": "DUEL", "2": "RACE", "3": "TDM", "4": "CA", "5": "CTF", "6": "ONEFLAG", "7": "OVERLOAD", "8": "HARVESTER", "9": "FT" } gametypes_num = match.group(1).strip() current_gametype = gametypes.get(gametypes_num, "UNKNOWN") logger.info('Got gametype: %s' % current_gametype) PrintMessageFormatted(output_window, f'Detected gametype: {current_gametype}\n') # Try to parse as JSON game event parsed_event = ParseGameEvent(msg_str) if parsed_event: # Successfully parsed game event PrintMessageFormatted(output_window, parsed_event) else: # Check if it looks like JSON (starts with { or [) stripped = msg_str.strip() if stripped and stripped[0] in ('{', '['): # It's JSON but we didn't parse it - already logged to file logger.debug('Unparsed JSON event') else: # Not JSON - check if it's a bot debug message (filter unless verbose) is_bot_debug = ' entered ' in msg_str and (' seek ' in msg_str or ' battle ' in msg_str or ' chase' in msg_str or ' fight' in msg_str) if is_bot_debug and args.verbose == 0: # Skip bot debug messages in default mode logger.debug('Filtered bot debug message: %s' % msg_str[:50]) continue # Check if it's a chat message if ':' in msg_str and not msg_str.startswith(('print', 'broadcast', 'zmq')): # Strip the special character first to get clean player name clean_msg = msg_str.replace(chr(25), '') # Check if it's team chat (PlayerName): or regular chat PlayerName: if clean_msg.strip().startswith('(') and ')' in clean_msg: # Team chat: (PlayerName) (Location): message or (PlayerName): message # Location can contain nested parens like (Lower Floor (Near Yellow Armour)) # We need to match the first (PlayerName), then optionally a second parenthetical with potential nesting # First, extract player name (first parenthetical) player_match = re.match(r'^(\([^)]+\))', clean_msg) if player_match: player_part = player_match.group(1) rest_of_msg = clean_msg[len(player_part):].lstrip() # Check if there's a location (starts with another opening paren) if rest_of_msg.startswith('('): # Find the matching closing paren for the location # We need to count parens to handle nesting like (Floor (Near Armor)) paren_count = 0 location_end = -1 for i, char in enumerate(rest_of_msg): if char == '(': paren_count += 1 elif char == ')': paren_count -= 1 if paren_count == 0: location_end = i + 1 break if location_end > 0 and location_end < len(rest_of_msg) and rest_of_msg[location_end] == ':': # We found a valid location with closing paren followed by colon location_part = rest_of_msg[:location_end] message_part = rest_of_msg[location_end + 1:] # After the colon # Extract player name for team lookup name_match = re.match(r'\(([^)]+)\)', player_part) if name_match: player_name = name_match.group(1).strip() player_name_clean = re.sub(r'\^\d', '', player_name) team_prefix = '' if player_name_clean in player_teams: team_prefix = get_team_color(player_name_clean) # Strip color codes from location for consistent yellow coloring location_clean = re.sub(r'\^\d', '', location_part) msg_str = f"{team_prefix}{player_part} ^3{location_clean}^7:^5{message_part}" else: # No valid location, treat as regular team chat if rest_of_msg.startswith(':'): message_part = rest_of_msg[1:] name_match = re.match(r'\(([^)]+)\)', player_part) if name_match: player_name = name_match.group(1).strip() player_name_clean = re.sub(r'\^\d', '', player_name) team_prefix = '' if player_name_clean in player_teams: team_prefix = get_team_color(player_name_clean) msg_str = f"{team_prefix}{player_part}^5:{message_part}" else: # No location, check if it's just (PlayerName): message colon_match = re.match(r'^(\([^)]+\)):(\s*.*)', clean_msg) if colon_match: player_part = colon_match.group(1) + ':' message_part = colon_match.group(2) name_match = re.match(r'\(([^)]+)\)', player_part) if name_match: player_name = name_match.group(1).strip() player_name_clean = re.sub(r'\^\d', '', player_name) team_prefix = '' if player_name_clean in player_teams: team_prefix = get_team_color(player_name_clean) msg_str = f"{team_prefix}{player_part}^5{message_part}\n" else: # Regular chat: PlayerName: message parts = clean_msg.split(':', 1) if len(parts) == 2: player_name = parts[0].strip() message_text = parts[1] # Keep the leading space # Strip color codes from chat player name player_name_clean = re.sub(r'\^\d', '', player_name) # Look up player by clean name and add team prefix team_prefix = '' if player_name_clean in player_teams: team_prefix = get_team_color(player_name_clean) # Reconstruct with team prefix and colored message (^2 for regular chat) # Need to preserve the original player_name with color codes from msg_str original_parts = msg_str.replace(chr(25), '').split(':', 1) if len(original_parts) == 2: msg_str = f"{team_prefix}{original_parts[0]}:^2{original_parts[1]}" # Print the message (chat or other) PrintMessageFormatted(output_window, msg_str) curses.setsyx(y,x) curses.doupdate() else: logger.debug( 'Received empty message (possible keepalive or protocol frame)' ) except KeyboardInterrupt: PrintMessageFormatted(output_window, "\nShutting down...\n") except Exception as e: timestamp = time.strftime('%H:%M:%S') PrintMessageFormatted(output_window, f"^1[^7{timestamp}^1] Fatal Error: {e}^7\n", add_timestamp=False) logger.error( 'Fatal error: %s' % e ) import traceback logger.error( traceback.format_exc() ) finally: if stats_socket: stats_socket.close() if stats_context: stats_context.term() if ( __name__ == '__main__' ): curses.wrapper(main) # Version: 1.13.0