commit 2764135b39e4b31dd3ac64322feefd0c3d2fe027 Author: xbl Date: Wed Dec 10 23:12:09 2025 +0100 init diff --git a/qlpycon.py b/qlpycon.py new file mode 100644 index 0000000..6b21b65 --- /dev/null +++ b/qlpycon.py @@ -0,0 +1,497 @@ +#!/usr/bin/env python3 + +import sys +import time +import struct +import argparse +import uuid +import threading +import queue +import json + +import logging +logger = logging.getLogger('logger') +logger.setLevel(logging.DEBUG) + +import zmq +import curses +import curses.textpad +import signal + +import unittest + +import locale +locale.setlocale(locale.LC_ALL, '') + +def _readSocketEvent( msg ): + event_id = struct.unpack( ' 0 : + q.put( l ) + window.clear() + window.refresh() + q = queue.Queue() + t = threading.Thread( target = waitStdin, args = ( q, ) ) + t.daemon = True + t.start() + return q + +HOST = 'tcp://127.0.0.1:27961' +POLL_TIMEOUT = 100 + +def PrintMessageColored(window, message, attributes): + if not curses.has_colors: + window.addstr(message) + return + + color = 0 + parse_color = False + for ch in message: + val = ord( ch ) + if parse_color: + if val >= ord('0') and val <= ord('7'): + color = val - ord('0') + if color == 7: + color = 0 + else: + window.addch('^', curses.color_pair(color) | attributes) + window.addch(ch, curses.color_pair(color) | attributes) + parse_color = False + elif ch == '^': + parse_color = True + else: + window.addch(ch, curses.color_pair(color) | attributes) + + window.refresh() + +def PrintMessageFormatted(window, message): + attributes = 0 + + message = message.replace("\\n", "") + message = message.replace(chr(25), "") + + if message[:10] == "broadcast:": + message = message[11:] + attributes = curses.A_BOLD + + if message[:7] == "print \"": + message = message[7:-2] + "\n" + + PrintMessageColored(window, message, attributes) + +def ParseGameEvent(message): + """Parse JSON game events and return formatted message""" + try: + jObject = json.loads(message) + if 'TYPE' not in jObject or 'DATA' not in jObject: + return None + + event_type = jObject['TYPE'] + data = jObject['DATA'] + + warmup = data.get('WARMUP', False) + warmup_suffix = " ^7(^3warmup^7)" if warmup else "" + + if event_type == 'PLAYER_SWITCHTEAM': + if 'KILLER' not in data: + return None + killer = data['KILLER'] + team = killer.get('TEAM', '') + old_team = killer.get('OLD_TEAM', '') + + if team == old_team: + return None + + team_msg = { + 'FREE': ' ^7Joined the ^2Fight^7', + 'SPECTATOR': ' ^7Joined the ^3Spectators^7', + 'RED': ' ^7Joined the ^1Red ^7team', + 'BLUE': ' ^7Joined the ^4Blue ^7team' + }.get(team, ' ^7Joined team %s^7' % team) + + old_team_msg = { + 'FREE': 'the ^2Fight^7', + 'SPECTATOR': 'the ^3Spectators^7', + 'RED': '^7the ^1Red ^7team', + 'BLUE': '^7the ^4Blue ^7team' + }.get(old_team, 'team %s' % old_team) + + return "%s%s from %s%s\n" % (killer.get('NAME', 'Unknown'), team_msg, old_team_msg, warmup_suffix) + + elif event_type == 'PLAYER_DEATH': + if 'VICTIM' not in data: + return None + victim = data['VICTIM'] + victim_name = victim.get('NAME', 'Unknown') + + if 'KILLER' not in data or not data['KILLER']: + mod = data.get('MOD', 'UNKNOWN') + death_msgs = { + 'FALLING': "%s ^1CRATERED^7" % victim_name, + 'HURT': "%s ^1WAS IN THE WRONG PLACE^7" % victim_name, + 'LAVA': "%s ^1DOES A BACKFLIP INTO THE LAVA^7" % victim_name, + 'WATER': "%s ^1SANK LIKE A ROCK^7" % victim_name, + 'SLIME': "%s ^1MELTED^7" % victim_name, + 'CRUSH': "%s ^1WAS CRUSHED^7" % victim_name + } + msg = death_msgs.get(mod, "%s ^1DIED FROM %s^7" % (victim_name, mod)) + return "%s%s\n" % (msg, warmup_suffix) + + killer = data['KILLER'] + killer_name = killer.get('NAME', 'Unknown') + + if killer_name != victim_name: + killer_team = killer.get('TEAM', '') + victim_team = victim.get('TEAM', '') + killer_color = '^7(^1RED^7)' if killer_team == '1' else '^7(^4BLUE^7)' + victim_color = '^7(^1RED^7)' if victim_team == '1' else '^7(^4BLUE^7)' + + weapon = killer.get('WEAPON', 'UNKNOWN') + weapon_names = { + 'ROCKET': 'the Rocket Launcher', + 'LIGHTNING': 'the Lightning Gun', + 'RAILGUN': 'the Railgun', + 'SHOTGUN': 'the Shotgun', + 'GAUNTLET': 'the Gauntlet', + 'GRENADE': 'the Grenade Launcher', + 'PLASMA': 'the Plasma Gun', + 'MACHINEGUN': 'the Machine Gun' + } + weapon_name = weapon_names.get(weapon, 'the %s' % weapon) + + return "%s %s ^5killed %s %s ^7with %s%s\n" % (killer_color, killer_name, victim_color, victim_name, weapon_name, warmup_suffix) + + else: + weapon = killer.get('WEAPON', 'OTHER_WEAPON') + if weapon != 'OTHER_WEAPON': + weapon_names = { + 'ROCKET': 'Rocket Launcher', + 'PLASMA': 'Plasma Gun', + 'GRENADE': 'Grenade Launcher' + } + weapon_name = weapon_names.get(weapon, weapon) + return "%s ^7committed suicide with the ^1%s%s\n" % (killer_name, weapon_name, warmup_suffix) + + elif event_type == 'PLAYER_MEDAL': + name = data.get('NAME', 'Unknown') + medal = data.get('MEDAL', 'UNKNOWN') + return "%s ^7got a ^6%s ^7medal%s\n" % (name, medal, warmup_suffix) + + except (json.JSONDecodeError, KeyError, TypeError) as e: + return None + + return None + +def InitWindows(screen, args): + logger.handlers = [] + curses.endwin() + + curses.initscr() + screen.nodelay(1) + curses.start_color() + curses.cbreak() + curses.setsyx(-1, -1) + screen.addstr("Quake Live rcon: %s" % args.host) + screen.refresh() + maxy, maxx = screen.getmaxyx() + + for i in range(1,7): + curses.init_pair(i, i, 0) + + curses.init_pair(5, 6, 0) + curses.init_pair(6, 5, 0) + + begin_x = 2; width = maxx - 4 + begin_y = 2; height = maxy - 5 + output_window = curses.newwin(height, width, begin_y, begin_x) + screen.refresh() + output_window.scrollok(True) + output_window.idlok(True) + output_window.leaveok(True) + output_window.refresh() + + begin_x = 4; width = maxx - 6 + begin_y = maxy - 2; height = 1 + input_window = curses.newwin(height, width, begin_y, begin_x) + screen.addstr(begin_y, begin_x - 2, '>') + screen.refresh() + input_window.idlok(True) + input_window.leaveok(False) + input_window.refresh() + + begin_x = 2; width = maxx - 4 + begin_y = maxy - 3; height = 1 + divider_window = curses.newwin(height, width, begin_y, begin_x) + screen.refresh() + divider_window.hline(curses.ACS_HLINE, width) + divider_window.refresh() + + mh = CursesHandler(output_window) + formatterDisplay = logging.Formatter('%(asctime)-8s|%(name)-12s|%(levelname)-6s|%(message)-s', '%H:%M:%S') + mh.setFormatter(formatterDisplay) + logger.addHandler(mh) + + screen.refresh() + + return input_window, output_window + +def main(screen): + parser = argparse.ArgumentParser( description = 'Verbose QuakeLive server statistics' ) + parser.add_argument( '--host', default = HOST, help = 'ZMQ URI to connect to. Defaults to %s' % HOST ) + parser.add_argument( '--password', required = False ) + parser.add_argument( '--identity', default = uuid.uuid1().hex, help = 'Specify the socket identity. Random UUID used by default' ) + parser.add_argument( '-v', '--verbose', action='store_true', help = 'Enable verbose logging' ) + args = parser.parse_args() + + if not args.verbose: + logger.setLevel(logging.INFO) + + input_window, output_window = InitWindows(screen, args) + + logger.info('*** VERSION 1.0.2 STARTING ***') + logger.info('zmq python bindings %s, libzmq version %s' % ( repr( zmq.__version__ ), zmq.zmq_version() ) ) + + stats_port = None + stats_password = None + stats_context = None + stats_socket = None + stats_connected = False + stats_check_counter = 0 + + q = setupInputQueue(input_window) + try: + if args.verbose: + logger.info('Initializing ZMQ context...') + ctx = zmq.Context() + if args.verbose: + logger.info('Creating DEALER socket...') + socket = ctx.socket( zmq.DEALER ) + if args.verbose: + logger.info('Setting up socket monitor...') + monitor = socket.get_monitor_socket( zmq.EVENT_ALL ) + if ( args.password is not None ): + logger.info( 'Setting password for access' ) + socket.plain_username = b'rcon' + socket.plain_password = args.password.encode('utf-8') + socket.zap_domain = b'rcon' + if args.verbose: + logger.info( 'Setting socket identity: %s' % args.identity ) + socket.setsockopt( zmq.IDENTITY, args.identity.encode('utf-8') ) + logger.info( 'Connecting to %s...' % args.host ) + socket.connect( args.host ) + if args.verbose: + logger.info( 'Connection initiated, waiting for events...' ) + while ( True ): + event = socket.poll( POLL_TIMEOUT ) + event_monitor = _checkMonitor( monitor ) + if ( event_monitor is not None and event_monitor[0] == zmq.EVENT_CONNECTED ): + logger.info( 'Connected! Registering with the server...' ) + socket.send( b'register' ) + if args.verbose: + logger.info( 'Registration message sent.' ) + + logger.info( 'Requesting game stats connection info...' ) + socket.send( b'zmq_stats_password' ) + socket.send( b'net_port' ) + + while ( not q.empty() ): + l = q.get() + if args.verbose: + logger.info( 'Sending command: %s' % repr( l.strip() ) ) + socket.send( l.encode('utf-8') ) + + if stats_connected and stats_socket: + stats_check_counter += 1 + + if stats_check_counter % 100 == 0: + logger.info('Stats polling active (check #%d)' % (stats_check_counter // 100)) + + try: + stats_msg = stats_socket.recv(zmq.NOBLOCK) + if len(stats_msg) > 0: + logger.info('!!! STATS EVENT RECEIVED (%d bytes) !!!' % len(stats_msg)) + y,x = curses.getsyx() + stats_str = stats_msg.decode('utf-8', errors='replace') + + logger.info('Stats JSON: %s' % repr(stats_str[:200])) + + parsed_event = ParseGameEvent(stats_str) + if parsed_event: + logger.info('Parsed successfully') + PrintMessageFormatted(output_window, parsed_event) + else: + logger.info('Parse failed, showing raw') + PrintMessageFormatted(output_window, stats_str + '\n') + + curses.setsyx(y,x) + curses.doupdate() + except zmq.error.Again: + pass + except Exception as e: + logger.error('Stats socket error: %s' % e) + import traceback + logger.error(traceback.format_exc()) + + if ( event == 0 ): + continue + + if args.verbose: + logger.debug( 'Socket has data available, reading messages...' ) + msg_count = 0 + while ( True ): + try: + msg = socket.recv( zmq.NOBLOCK ) + msg_count += 1 + except zmq.error.Again: + if args.verbose and msg_count > 0: + logger.debug( 'Read %d message(s) from socket.' % msg_count ) + break + except Exception as e: + logger.error( 'Error receiving message: %s' % e ) + break + else: + if len( msg ) > 0: + if args.verbose: + logger.debug( 'Received message (%d bytes): %s' % (len(msg), repr(msg[:100])) ) + y,x = curses.getsyx() + msg_str = msg.decode('utf-8', errors='replace') + + if 'net_port' in msg_str and ' is:' in msg_str and '"net_port"' in msg_str: + import re + match = re.search(r' is:"([^"]+)"', msg_str) + if match: + port_str = match.group(1) + port_str = port_str.strip() + digit_match = re.search(r'(\d+)', port_str) + if digit_match and stats_port is None: + stats_port = digit_match.group(1) + logger.info('>>> Got stats port: %s' % stats_port) + + if 'zmq_stats_password' in msg_str and ' is:' in msg_str and '"zmq_stats_password"' in msg_str: + import re + match = re.search(r' is:"([^"]+)"', msg_str) + if match and stats_password is None: + password_str = match.group(1) + password_str = re.sub(r'\^\d', '', password_str) + stats_password = password_str.strip() + logger.info('>>> Got stats password: %s' % stats_password) + + if stats_port and stats_password and not stats_connected: + try: + logger.info('>>> CONNECTING TO STATS PORT <<<') + logger.info(' Host: %s:%s' % (args.host.split('//')[1].split(':')[0], stats_port)) + logger.info(' Password: %s' % stats_password) + + host_ip = args.host.split('//')[1].split(':')[0] + stats_host = 'tcp://%s:%s' % (host_ip, stats_port) + + time.sleep(0.5) + + stats_context = zmq.Context() + stats_socket = stats_context.socket(zmq.SUB) + logger.info('>>> Stats socket created (SUB type)') + + if stats_password and stats_password.strip(): + logger.info('>>> Setting PLAIN authentication') + stats_socket.setsockopt(zmq.PLAIN_USERNAME, b'stats') + stats_socket.setsockopt(zmq.PLAIN_PASSWORD, stats_password.encode('utf-8')) + stats_socket.setsockopt_string(zmq.ZAP_DOMAIN, 'stats') + logger.info('>>> Auth configured') + + logger.info('>>> Connecting to %s' % stats_host) + stats_socket.connect(stats_host) + logger.info('>>> Connected') + + logger.info('>>> Setting ZMQ_SUBSCRIBE to empty (all messages)') + stats_socket.setsockopt(zmq.SUBSCRIBE, b'') + logger.info('>>> Subscribed') + + time.sleep(0.5) + + stats_connected = True + logger.info('>>> STATS CONNECTION COMPLETE <<<') + logger.info('>>> Waiting for game events...') + except Exception as e: + logger.error('!!! Stats connection FAILED: %s' % e) + import traceback + logger.error(traceback.format_exc()) + + parsed_event = ParseGameEvent(msg_str) + if parsed_event: + PrintMessageFormatted(output_window, parsed_event) + else: + PrintMessageFormatted(output_window, msg_str) + + curses.setsyx(y,x) + curses.doupdate() + else: + if args.verbose: + logger.debug( 'Received empty message (possible keepalive or protocol frame)' ) + + except KeyboardInterrupt: + logger.info( 'Received keyboard interrupt, shutting down...' ) + except Exception as e: + logger.error( 'Fatal error: %s' % e ) + import traceback + logger.error( traceback.format_exc() ) + finally: + if stats_socket: + stats_socket.close() + if stats_context: + stats_context.term() + +if ( __name__ == '__main__' ): + curses.wrapper(main) + +# Version: 1.0.2 - Python 3 ZMQ Rcon Client with Stats Support