diff --git a/qlpycon.py b/qlpycon.py index 6b21b65..6315cf5 100644 --- a/qlpycon.py +++ b/qlpycon.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import sys +import re import time import struct import argparse @@ -13,6 +14,10 @@ import logging logger = logging.getLogger('logger') logger.setLevel(logging.DEBUG) +# Separate logger for unknown JSON events +unknown_json_logger = logging.getLogger('unknown_json') +unknown_json_logger.setLevel(logging.DEBUG) + import zmq import curses import curses.textpad @@ -23,6 +28,35 @@ import unittest import locale locale.setlocale(locale.LC_ALL, '') +current_gametype = None + +# Track recent events to avoid duplicates +recent_events = [] +MAX_RECENT_EVENTS = 10 + +team_modes = ["TDM", "CA", "CTF", "ONEFLAG", "OVERLOAD", "HARVESTER", "FT" ] + +player_teams = {} + +def update_player_team(name, team): + """Update player team tracking. Team can be int (1=RED, 2=BLUE, 3=SPECTATOR) or string""" + if current_gametype in team_modes: + # Convert numeric team to string + if isinstance(team, int): + team_map = {1: 'RED', 2: 'BLUE', 3: 'SPECTATOR', 0: 'FREE'} + team = team_map.get(team, 'FREE') + + if team not in ['RED', 'BLUE', 'FREE', 'SPECTATOR']: + team = 'FREE' + + # Store both the original name and the color-stripped version + player_teams[name] = team + clean_name = re.sub(r'\^\d', '', name) + if clean_name != name: + player_teams[clean_name] = team + + logger.debug(f'Updated team for {name} (clean: {clean_name}): {team}') + def _readSocketEvent( msg ): event_id = struct.unpack( ' MAX_RECENT_EVENTS: + recent_events.pop(0) + warmup = data.get('WARMUP', False) warmup_suffix = " ^7(^3warmup^7)" if warmup else "" if event_type == 'PLAYER_SWITCHTEAM': if 'KILLER' not in data: + logger.debug('PLAYER_SWITCHTEAM missing KILLER field') return None killer = data['KILLER'] + name = killer.get('NAME', 'Unknown') team = killer.get('TEAM', '') old_team = killer.get('OLD_TEAM', '') + # Update player team tracking + update_player_team(name, team) + if team == old_team: return None team_msg = { - 'FREE': ' ^7Joined the ^2Fight^7', - 'SPECTATOR': ' ^7Joined the ^3Spectators^7', - 'RED': ' ^7Joined the ^1Red ^7team', - 'BLUE': ' ^7Joined the ^4Blue ^7team' - }.get(team, ' ^7Joined team %s^7' % team) + 'FREE': ' ^7joined the ^2Fight^7', + 'SPECTATOR': ' ^7joined the ^3Spectators^7', + 'RED': ' ^7joined the ^1Red ^7team', + 'BLUE': ' ^7joined the ^4Blue ^7team' + }.get(team, ' ^7joined team %s^7' % team) old_team_msg = { 'FREE': 'the ^2Fight^7', @@ -167,36 +249,49 @@ def ParseGameEvent(message): 'BLUE': '^7the ^4Blue ^7team' }.get(old_team, 'team %s' % old_team) - return "%s%s from %s%s\n" % (killer.get('NAME', 'Unknown'), team_msg, old_team_msg, warmup_suffix) + team_prefix = get_team_color(name) + return "%s%s%s from %s%s\n" % (team_prefix, name, team_msg, old_team_msg, warmup_suffix) - elif event_type == 'PLAYER_DEATH': + elif event_type == 'PLAYER_DEATH' or event_type == 'PLAYER_KILL': if 'VICTIM' not in data: + logger.debug('PLAYER_DEATH/PLAYER_KILL missing VICTIM field') return None victim = data['VICTIM'] victim_name = victim.get('NAME', 'Unknown') + # Update victim team from event data + if 'TEAM' in victim: + update_player_team(victim_name, victim['TEAM']) + + victim_team_prefix = get_team_color(victim_name) + if 'KILLER' not in data or not data['KILLER']: mod = data.get('MOD', 'UNKNOWN') death_msgs = { - 'FALLING': "%s ^1CRATERED^7" % victim_name, - 'HURT': "%s ^1WAS IN THE WRONG PLACE^7" % victim_name, - 'LAVA': "%s ^1DOES A BACKFLIP INTO THE LAVA^7" % victim_name, - 'WATER': "%s ^1SANK LIKE A ROCK^7" % victim_name, - 'SLIME': "%s ^1MELTED^7" % victim_name, - 'CRUSH': "%s ^1WAS CRUSHED^7" % victim_name + 'FALLING': "%s%s ^1CRATERED^7", + 'HURT': "%s%s ^1WAS IN THE WRONG PLACE^7", + 'LAVA': "%s%s ^1DOES A BACKFLIP INTO THE LAVA^7", + 'WATER': "%s%s ^1SANK LIKE A ROCK^7", + 'SLIME': "%s%s ^1MELTED^7", + 'CRUSH': "%s%s ^1WAS CRUSHED^7" } - msg = death_msgs.get(mod, "%s ^1DIED FROM %s^7" % (victim_name, mod)) + msg_template = death_msgs.get(mod, "%s%s ^1DIED FROM %s^7") + if mod in death_msgs: + msg = msg_template % (victim_team_prefix, victim_name) + else: + msg = msg_template % (victim_team_prefix, victim_name, mod) return "%s%s\n" % (msg, warmup_suffix) killer = data['KILLER'] killer_name = killer.get('NAME', 'Unknown') + # Update killer team from event data + if 'TEAM' in killer: + update_player_team(killer_name, killer['TEAM']) + + killer_team_prefix = get_team_color(killer_name) + if killer_name != victim_name: - killer_team = killer.get('TEAM', '') - victim_team = victim.get('TEAM', '') - killer_color = '^7(^1RED^7)' if killer_team == '1' else '^7(^4BLUE^7)' - victim_color = '^7(^1RED^7)' if victim_team == '1' else '^7(^4BLUE^7)' - weapon = killer.get('WEAPON', 'UNKNOWN') weapon_names = { 'ROCKET': 'the Rocket Launcher', @@ -210,9 +305,14 @@ def ParseGameEvent(message): } weapon_name = weapon_names.get(weapon, 'the %s' % weapon) - return "%s %s ^5killed %s %s ^7with %s%s\n" % (killer_color, killer_name, victim_color, victim_name, weapon_name, warmup_suffix) + return "%s%s ^5killed %s%s ^7with %s%s\n" % ( + killer_team_prefix, killer_name, + victim_team_prefix, victim_name, + weapon_name, warmup_suffix + ) else: + # Suicide weapon = killer.get('WEAPON', 'OTHER_WEAPON') if weapon != 'OTHER_WEAPON': weapon_names = { @@ -221,14 +321,30 @@ def ParseGameEvent(message): 'GRENADE': 'Grenade Launcher' } weapon_name = weapon_names.get(weapon, weapon) - return "%s ^7committed suicide with the ^1%s%s\n" % (killer_name, weapon_name, warmup_suffix) + return "%s%s ^7committed suicide with the ^1%s%s\n" % ( + killer_team_prefix, killer_name, weapon_name, warmup_suffix + ) elif event_type == 'PLAYER_MEDAL': name = data.get('NAME', 'Unknown') medal = data.get('MEDAL', 'UNKNOWN') - return "%s ^7got a ^6%s ^7medal%s\n" % (name, medal, warmup_suffix) + warmup = data.get('WARMUP', False) + warmup_suffix = " ^7(^3warmup^7)" if warmup else "" + team_prefix = get_team_color(name) + return "%s%s ^7got a ^6%s ^7medal%s\n" % (team_prefix, name, medal, warmup_suffix) + + else: + # Unknown event type - log at debug level and to file + logger.debug('Unknown event type: %s' % event_type) + unknown_json_logger.info('Unknown event type: %s' % event_type) + unknown_json_logger.info('Full JSON: %s' % json.dumps(jObject, indent=2)) + return None - except (json.JSONDecodeError, KeyError, TypeError) as e: + except json.JSONDecodeError as e: + logger.debug('JSON decode error: %s' % e) + return None + except (KeyError, TypeError) as e: + logger.debug('Error parsing game event: %s' % e) return None return None @@ -287,20 +403,38 @@ def InitWindows(screen, args): return input_window, output_window def main(screen): + global current_gametype + parser = argparse.ArgumentParser( description = 'Verbose QuakeLive server statistics' ) parser.add_argument( '--host', default = HOST, help = 'ZMQ URI to connect to. Defaults to %s' % HOST ) parser.add_argument( '--password', required = False ) parser.add_argument( '--identity', default = uuid.uuid1().hex, help = 'Specify the socket identity. Random UUID used by default' ) - parser.add_argument( '-v', '--verbose', action='store_true', help = 'Enable verbose logging' ) + parser.add_argument( '-v', '--verbose', action='count', default=0, help = 'Increase verbosity (use -v for INFO, -vv for DEBUG)' ) + parser.add_argument( '--unknown-log', default='unknown_events.log', help = 'File to log unknown JSON events. Defaults to unknown_events.log' ) args = parser.parse_args() - if not args.verbose: + # Set logging level based on verbosity + # Default (0): WARNING - only startup, connections, and game data + # -v (1): INFO - all communications and acknowledgements + # -vv (2): DEBUG - detailed debug information including unparsed JSON + if args.verbose == 0: + logger.setLevel(logging.WARNING) + elif args.verbose == 1: logger.setLevel(logging.INFO) + else: + logger.setLevel(logging.DEBUG) + + # Set up file handler for unknown JSON events + file_handler = logging.FileHandler(args.unknown_log, mode='a') + file_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S') + file_handler.setFormatter(file_formatter) + unknown_json_logger.addHandler(file_handler) + unknown_json_logger.propagate = False # Don't send to parent logger input_window, output_window = InitWindows(screen, args) - logger.info('*** VERSION 1.0.2 STARTING ***') - logger.info('zmq python bindings %s, libzmq version %s' % ( repr( zmq.__version__ ), zmq.zmq_version() ) ) + PrintMessageFormatted(output_window, "*** QL pyCon Version 1.3.0 starting ***\n") + PrintMessageFormatted(output_window, "zmq python bindings {}, libzmq version {}\n".format(repr(zmq.__version__), zmq.zmq_version())) stats_port = None stats_password = None @@ -311,90 +445,77 @@ def main(screen): q = setupInputQueue(input_window) try: - if args.verbose: - logger.info('Initializing ZMQ context...') + logger.info('Initializing ZMQ context...') ctx = zmq.Context() - if args.verbose: - logger.info('Creating DEALER socket...') + logger.info('Creating DEALER socket...') socket = ctx.socket( zmq.DEALER ) - if args.verbose: - logger.info('Setting up socket monitor...') + logger.info('Setting up socket monitor...') monitor = socket.get_monitor_socket( zmq.EVENT_ALL ) if ( args.password is not None ): logger.info( 'Setting password for access' ) socket.plain_username = b'rcon' socket.plain_password = args.password.encode('utf-8') socket.zap_domain = b'rcon' - if args.verbose: - logger.info( 'Setting socket identity: %s' % args.identity ) + logger.info( 'Setting socket identity: %s' % args.identity ) socket.setsockopt( zmq.IDENTITY, args.identity.encode('utf-8') ) - logger.info( 'Connecting to %s...' % args.host ) socket.connect( args.host ) - if args.verbose: - logger.info( 'Connection initiated, waiting for events...' ) + logger.info( 'Connection initiated, waiting for events...' ) while ( True ): event = socket.poll( POLL_TIMEOUT ) event_monitor = _checkMonitor( monitor ) if ( event_monitor is not None and event_monitor[0] == zmq.EVENT_CONNECTED ): - logger.info( 'Connected! Registering with the server...' ) + PrintMessageFormatted(output_window, "Connected to server\n") socket.send( b'register' ) - if args.verbose: - logger.info( 'Registration message sent.' ) + logger.info( 'Registration message sent.' ) - logger.info( 'Requesting game stats connection info...' ) + PrintMessageFormatted(output_window, "Requesting game info...\n") socket.send( b'zmq_stats_password' ) socket.send( b'net_port' ) - + logger.info('Requesting Gametype...') + socket.send(b'cvarlist g_gametype') + while ( not q.empty() ): l = q.get() - if args.verbose: - logger.info( 'Sending command: %s' % repr( l.strip() ) ) + logger.info( 'Sending command: %s' % repr( l.strip() ) ) socket.send( l.encode('utf-8') ) if stats_connected and stats_socket: stats_check_counter += 1 if stats_check_counter % 100 == 0: - logger.info('Stats polling active (check #%d)' % (stats_check_counter // 100)) + logger.debug('Stats polling active (check #%d)' % (stats_check_counter // 100)) try: stats_msg = stats_socket.recv(zmq.NOBLOCK) - if len(stats_msg) > 0: - logger.info('!!! STATS EVENT RECEIVED (%d bytes) !!!' % len(stats_msg)) - y,x = curses.getsyx() - stats_str = stats_msg.decode('utf-8', errors='replace') - - logger.info('Stats JSON: %s' % repr(stats_str[:200])) - - parsed_event = ParseGameEvent(stats_str) - if parsed_event: - logger.info('Parsed successfully') - PrintMessageFormatted(output_window, parsed_event) - else: - logger.info('Parse failed, showing raw') - PrintMessageFormatted(output_window, stats_str + '\n') - - curses.setsyx(y,x) - curses.doupdate() except zmq.error.Again: - pass - except Exception as e: - logger.error('Stats socket error: %s' % e) - import traceback - logger.error(traceback.format_exc()) + stats_msg = None + if stats_msg: + logger.info('Stats event received (%d bytes)' % len(stats_msg)) + y,x = curses.getsyx() + stats_str = stats_msg.decode('utf-8', errors='replace') + logger.debug('Stats JSON: %s' % repr(stats_str[:200])) + + parsed_event = ParseGameEvent(stats_str) + if parsed_event: + logger.debug('Event parsed successfully') + PrintMessageFormatted(output_window, parsed_event) + else: + logger.debug('Event parsing returned None') + + curses.setsyx(y,x) + curses.doupdate() if ( event == 0 ): continue - if args.verbose: - logger.debug( 'Socket has data available, reading messages...' ) + logger.debug( 'Socket has data available, reading messages...' ) msg_count = 0 while ( True ): try: msg = socket.recv( zmq.NOBLOCK ) msg_count += 1 except zmq.error.Again: - if args.verbose and msg_count > 0: + if msg_count > 0: logger.debug( 'Read %d message(s) from socket.' % msg_count ) break except Exception as e: @@ -402,13 +523,11 @@ def main(screen): break else: if len( msg ) > 0: - if args.verbose: - logger.debug( 'Received message (%d bytes): %s' % (len(msg), repr(msg[:100])) ) + logger.debug( 'Received message (%d bytes): %s' % (len(msg), repr(msg[:100])) ) y,x = curses.getsyx() msg_str = msg.decode('utf-8', errors='replace') if 'net_port' in msg_str and ' is:' in msg_str and '"net_port"' in msg_str: - import re match = re.search(r' is:"([^"]+)"', msg_str) if match: port_str = match.group(1) @@ -416,20 +535,19 @@ def main(screen): digit_match = re.search(r'(\d+)', port_str) if digit_match and stats_port is None: stats_port = digit_match.group(1) - logger.info('>>> Got stats port: %s' % stats_port) + logger.info('Got stats port: %s' % stats_port) if 'zmq_stats_password' in msg_str and ' is:' in msg_str and '"zmq_stats_password"' in msg_str: - import re match = re.search(r' is:"([^"]+)"', msg_str) if match and stats_password is None: password_str = match.group(1) password_str = re.sub(r'\^\d', '', password_str) stats_password = password_str.strip() - logger.info('>>> Got stats password: %s' % stats_password) + logger.info('Got stats password: %s' % stats_password) if stats_port and stats_password and not stats_connected: try: - logger.info('>>> CONNECTING TO STATS PORT <<<') + PrintMessageFormatted(output_window, "Connecting to stats stream...\n") logger.info(' Host: %s:%s' % (args.host.split('//')[1].split(':')[0], stats_port)) logger.info(' Password: %s' % stats_password) @@ -440,47 +558,121 @@ def main(screen): stats_context = zmq.Context() stats_socket = stats_context.socket(zmq.SUB) - logger.info('>>> Stats socket created (SUB type)') + logger.debug('Stats socket created (SUB type)') if stats_password and stats_password.strip(): - logger.info('>>> Setting PLAIN authentication') + logger.debug('Setting PLAIN authentication') stats_socket.setsockopt(zmq.PLAIN_USERNAME, b'stats') stats_socket.setsockopt(zmq.PLAIN_PASSWORD, stats_password.encode('utf-8')) stats_socket.setsockopt_string(zmq.ZAP_DOMAIN, 'stats') - logger.info('>>> Auth configured') + logger.debug('Auth configured') - logger.info('>>> Connecting to %s' % stats_host) + logger.debug('Connecting to %s' % stats_host) stats_socket.connect(stats_host) - logger.info('>>> Connected') + logger.debug('Connected') - logger.info('>>> Setting ZMQ_SUBSCRIBE to empty (all messages)') + logger.debug('Setting ZMQ_SUBSCRIBE to empty (all messages)') stats_socket.setsockopt(zmq.SUBSCRIBE, b'') - logger.info('>>> Subscribed') + logger.debug('Subscribed') time.sleep(0.5) stats_connected = True - logger.info('>>> STATS CONNECTION COMPLETE <<<') - logger.info('>>> Waiting for game events...') + PrintMessageFormatted(output_window, "Stats stream connected - ready for game events\n") except Exception as e: - logger.error('!!! Stats connection FAILED: %s' % e) + logger.error('Stats connection failed: %s' % e) import traceback - logger.error(traceback.format_exc()) + logger.debug(traceback.format_exc()) + + if 'g_gametype' in msg_str: + match = re.search(r'\bg_gametype\s+"(\d+)"', msg_str) + if match: + gametypes = { + "0": "FFA", + "1": "DUEL", + "2": "RACE", + "3": "TDM", + "4": "CA", + "5": "CTF", + "6": "ONEFLAG", + "7": "OVERLOAD", + "8": "HARVESTER", + "9": "FT" + } + gametypes_num = match.group(1).strip() + current_gametype = gametypes.get(gametypes_num, "UNKNOWN") + logger.info('Got gametype: %s' % current_gametype) + PrintMessageFormatted(output_window, f'Detected gametype: {current_gametype}\n') + # Try to parse as JSON game event parsed_event = ParseGameEvent(msg_str) if parsed_event: + # Successfully parsed game event PrintMessageFormatted(output_window, parsed_event) else: - PrintMessageFormatted(output_window, msg_str) + # Check if it looks like JSON (starts with { or [) + stripped = msg_str.strip() + if stripped and stripped[0] in ('{', '['): + # It's JSON but we didn't parse it - already logged to file + logger.debug('Unparsed JSON event') + else: + # Not JSON - check if it's a chat message + if ':' in msg_str and not msg_str.startswith(('print', 'broadcast', 'zmq')): + # Strip the special character first to get clean player name + clean_msg = msg_str.replace(chr(25), '') + + # Check if it's team chat (PlayerName): or regular chat PlayerName: + if clean_msg.strip().startswith('(') and ')' in clean_msg: + # Team chat: (PlayerName): message + match = re.match(r'^(\([^)]+\):)(\s*.*)', clean_msg) + if match: + player_part = match.group(1) # (PlayerName): + message_part = match.group(2) # message text + + # Extract just the player name for team lookup + name_match = re.match(r'\(([^)]+)\)', player_part) + if name_match: + player_name = name_match.group(1).strip() + player_name_clean = re.sub(r'\^\d', '', player_name) + + # Look up player by clean name and add team prefix + team_prefix = '' + if player_name_clean in player_teams: + team_prefix = get_team_color(player_name_clean) + + # Reconstruct with team prefix and colored message (^5 for team chat) + msg_str = f"{team_prefix}{player_part}^5{message_part}\n" + else: + # Regular chat: PlayerName: message + parts = clean_msg.split(':', 1) + if len(parts) == 2: + player_name = parts[0].strip() + message_text = parts[1] # Keep the leading space + + # Strip color codes from chat player name + player_name_clean = re.sub(r'\^\d', '', player_name) + + # Look up player by clean name and add team prefix + team_prefix = '' + if player_name_clean in player_teams: + team_prefix = get_team_color(player_name_clean) + + # Reconstruct with team prefix and colored message (^2 for regular chat) + # Need to preserve the original player_name with color codes from msg_str + original_parts = msg_str.replace(chr(25), '').split(':', 1) + if len(original_parts) == 2: + msg_str = f"{team_prefix}{original_parts[0]}:^2{original_parts[1]}" + + # Print the message (chat or other) + PrintMessageFormatted(output_window, msg_str) curses.setsyx(y,x) curses.doupdate() else: - if args.verbose: - logger.debug( 'Received empty message (possible keepalive or protocol frame)' ) + logger.debug( 'Received empty message (possible keepalive or protocol frame)' ) except KeyboardInterrupt: - logger.info( 'Received keyboard interrupt, shutting down...' ) + PrintMessageFormatted(output_window, "\nShutting down...\n") except Exception as e: logger.error( 'Fatal error: %s' % e ) import traceback @@ -494,4 +686,4 @@ def main(screen): if ( __name__ == '__main__' ): curses.wrapper(main) -# Version: 1.0.2 - Python 3 ZMQ Rcon Client with Stats Support +# Version: 1.8.0 - Added color coding to chat messages (^2 for regular, ^5 for team chat)