#!/usr/bin/env python3 import sys import time import struct import argparse import uuid import threading import queue import json import logging logger = logging.getLogger('logger') logger.setLevel(logging.DEBUG) import zmq import curses import curses.textpad import signal import unittest import locale locale.setlocale(locale.LC_ALL, '') def _readSocketEvent( msg ): event_id = struct.unpack( ' 0 : q.put( l ) window.clear() window.refresh() q = queue.Queue() t = threading.Thread( target = waitStdin, args = ( q, ) ) t.daemon = True t.start() return q HOST = 'tcp://127.0.0.1:27961' POLL_TIMEOUT = 100 def PrintMessageColored(window, message, attributes): if not curses.has_colors: window.addstr(message) return color = 0 parse_color = False for ch in message: val = ord( ch ) if parse_color: if val >= ord('0') and val <= ord('7'): color = val - ord('0') if color == 7: color = 0 else: window.addch('^', curses.color_pair(color) | attributes) window.addch(ch, curses.color_pair(color) | attributes) parse_color = False elif ch == '^': parse_color = True else: window.addch(ch, curses.color_pair(color) | attributes) window.refresh() def PrintMessageFormatted(window, message): attributes = 0 message = message.replace("\\n", "") message = message.replace(chr(25), "") if message[:10] == "broadcast:": message = message[11:] attributes = curses.A_BOLD if message[:7] == "print \"": message = message[7:-2] + "\n" PrintMessageColored(window, message, attributes) def ParseGameEvent(message): """Parse JSON game events and return formatted message""" try: jObject = json.loads(message) if 'TYPE' not in jObject or 'DATA' not in jObject: return None event_type = jObject['TYPE'] data = jObject['DATA'] warmup = data.get('WARMUP', False) warmup_suffix = " ^7(^3warmup^7)" if warmup else "" if event_type == 'PLAYER_SWITCHTEAM': if 'KILLER' not in data: return None killer = data['KILLER'] team = killer.get('TEAM', '') old_team = killer.get('OLD_TEAM', '') if team == old_team: return None team_msg = { 'FREE': ' ^7Joined the ^2Fight^7', 'SPECTATOR': ' ^7Joined the ^3Spectators^7', 'RED': ' ^7Joined the ^1Red ^7team', 'BLUE': ' ^7Joined the ^4Blue ^7team' }.get(team, ' ^7Joined team %s^7' % team) old_team_msg = { 'FREE': 'the ^2Fight^7', 'SPECTATOR': 'the ^3Spectators^7', 'RED': '^7the ^1Red ^7team', 'BLUE': '^7the ^4Blue ^7team' }.get(old_team, 'team %s' % old_team) return "%s%s from %s%s\n" % (killer.get('NAME', 'Unknown'), team_msg, old_team_msg, warmup_suffix) elif event_type == 'PLAYER_DEATH': if 'VICTIM' not in data: return None victim = data['VICTIM'] victim_name = victim.get('NAME', 'Unknown') if 'KILLER' not in data or not data['KILLER']: mod = data.get('MOD', 'UNKNOWN') death_msgs = { 'FALLING': "%s ^1CRATERED^7" % victim_name, 'HURT': "%s ^1WAS IN THE WRONG PLACE^7" % victim_name, 'LAVA': "%s ^1DOES A BACKFLIP INTO THE LAVA^7" % victim_name, 'WATER': "%s ^1SANK LIKE A ROCK^7" % victim_name, 'SLIME': "%s ^1MELTED^7" % victim_name, 'CRUSH': "%s ^1WAS CRUSHED^7" % victim_name } msg = death_msgs.get(mod, "%s ^1DIED FROM %s^7" % (victim_name, mod)) return "%s%s\n" % (msg, warmup_suffix) killer = data['KILLER'] killer_name = killer.get('NAME', 'Unknown') if killer_name != victim_name: killer_team = killer.get('TEAM', '') victim_team = victim.get('TEAM', '') killer_color = '^7(^1RED^7)' if killer_team == '1' else '^7(^4BLUE^7)' victim_color = '^7(^1RED^7)' if victim_team == '1' else '^7(^4BLUE^7)' weapon = killer.get('WEAPON', 'UNKNOWN') weapon_names = { 'ROCKET': 'the Rocket Launcher', 'LIGHTNING': 'the Lightning Gun', 'RAILGUN': 'the Railgun', 'SHOTGUN': 'the Shotgun', 'GAUNTLET': 'the Gauntlet', 'GRENADE': 'the Grenade Launcher', 'PLASMA': 'the Plasma Gun', 'MACHINEGUN': 'the Machine Gun' } weapon_name = weapon_names.get(weapon, 'the %s' % weapon) return "%s %s ^5killed %s %s ^7with %s%s\n" % (killer_color, killer_name, victim_color, victim_name, weapon_name, warmup_suffix) else: weapon = killer.get('WEAPON', 'OTHER_WEAPON') if weapon != 'OTHER_WEAPON': weapon_names = { 'ROCKET': 'Rocket Launcher', 'PLASMA': 'Plasma Gun', 'GRENADE': 'Grenade Launcher' } weapon_name = weapon_names.get(weapon, weapon) return "%s ^7committed suicide with the ^1%s%s\n" % (killer_name, weapon_name, warmup_suffix) elif event_type == 'PLAYER_MEDAL': name = data.get('NAME', 'Unknown') medal = data.get('MEDAL', 'UNKNOWN') return "%s ^7got a ^6%s ^7medal%s\n" % (name, medal, warmup_suffix) except (json.JSONDecodeError, KeyError, TypeError) as e: return None return None def InitWindows(screen, args): logger.handlers = [] curses.endwin() curses.initscr() screen.nodelay(1) curses.start_color() curses.cbreak() curses.setsyx(-1, -1) screen.addstr("Quake Live rcon: %s" % args.host) screen.refresh() maxy, maxx = screen.getmaxyx() for i in range(1,7): curses.init_pair(i, i, 0) curses.init_pair(5, 6, 0) curses.init_pair(6, 5, 0) begin_x = 2; width = maxx - 4 begin_y = 2; height = maxy - 5 output_window = curses.newwin(height, width, begin_y, begin_x) screen.refresh() output_window.scrollok(True) output_window.idlok(True) output_window.leaveok(True) output_window.refresh() begin_x = 4; width = maxx - 6 begin_y = maxy - 2; height = 1 input_window = curses.newwin(height, width, begin_y, begin_x) screen.addstr(begin_y, begin_x - 2, '>') screen.refresh() input_window.idlok(True) input_window.leaveok(False) input_window.refresh() begin_x = 2; width = maxx - 4 begin_y = maxy - 3; height = 1 divider_window = curses.newwin(height, width, begin_y, begin_x) screen.refresh() divider_window.hline(curses.ACS_HLINE, width) divider_window.refresh() mh = CursesHandler(output_window) formatterDisplay = logging.Formatter('%(asctime)-8s|%(name)-12s|%(levelname)-6s|%(message)-s', '%H:%M:%S') mh.setFormatter(formatterDisplay) logger.addHandler(mh) screen.refresh() return input_window, output_window def main(screen): parser = argparse.ArgumentParser( description = 'Verbose QuakeLive server statistics' ) parser.add_argument( '--host', default = HOST, help = 'ZMQ URI to connect to. Defaults to %s' % HOST ) parser.add_argument( '--password', required = False ) parser.add_argument( '--identity', default = uuid.uuid1().hex, help = 'Specify the socket identity. Random UUID used by default' ) parser.add_argument( '-v', '--verbose', action='store_true', help = 'Enable verbose logging' ) args = parser.parse_args() if not args.verbose: logger.setLevel(logging.INFO) input_window, output_window = InitWindows(screen, args) logger.info('*** VERSION 1.0.2 STARTING ***') logger.info('zmq python bindings %s, libzmq version %s' % ( repr( zmq.__version__ ), zmq.zmq_version() ) ) stats_port = None stats_password = None stats_context = None stats_socket = None stats_connected = False stats_check_counter = 0 q = setupInputQueue(input_window) try: if args.verbose: logger.info('Initializing ZMQ context...') ctx = zmq.Context() if args.verbose: logger.info('Creating DEALER socket...') socket = ctx.socket( zmq.DEALER ) if args.verbose: logger.info('Setting up socket monitor...') monitor = socket.get_monitor_socket( zmq.EVENT_ALL ) if ( args.password is not None ): logger.info( 'Setting password for access' ) socket.plain_username = b'rcon' socket.plain_password = args.password.encode('utf-8') socket.zap_domain = b'rcon' if args.verbose: logger.info( 'Setting socket identity: %s' % args.identity ) socket.setsockopt( zmq.IDENTITY, args.identity.encode('utf-8') ) logger.info( 'Connecting to %s...' % args.host ) socket.connect( args.host ) if args.verbose: logger.info( 'Connection initiated, waiting for events...' ) while ( True ): event = socket.poll( POLL_TIMEOUT ) event_monitor = _checkMonitor( monitor ) if ( event_monitor is not None and event_monitor[0] == zmq.EVENT_CONNECTED ): logger.info( 'Connected! Registering with the server...' ) socket.send( b'register' ) if args.verbose: logger.info( 'Registration message sent.' ) logger.info( 'Requesting game stats connection info...' ) socket.send( b'zmq_stats_password' ) socket.send( b'net_port' ) while ( not q.empty() ): l = q.get() if args.verbose: logger.info( 'Sending command: %s' % repr( l.strip() ) ) socket.send( l.encode('utf-8') ) if stats_connected and stats_socket: stats_check_counter += 1 if stats_check_counter % 100 == 0: logger.info('Stats polling active (check #%d)' % (stats_check_counter // 100)) try: stats_msg = stats_socket.recv(zmq.NOBLOCK) if len(stats_msg) > 0: logger.info('!!! STATS EVENT RECEIVED (%d bytes) !!!' % len(stats_msg)) y,x = curses.getsyx() stats_str = stats_msg.decode('utf-8', errors='replace') logger.info('Stats JSON: %s' % repr(stats_str[:200])) parsed_event = ParseGameEvent(stats_str) if parsed_event: logger.info('Parsed successfully') PrintMessageFormatted(output_window, parsed_event) else: logger.info('Parse failed, showing raw') PrintMessageFormatted(output_window, stats_str + '\n') curses.setsyx(y,x) curses.doupdate() except zmq.error.Again: pass except Exception as e: logger.error('Stats socket error: %s' % e) import traceback logger.error(traceback.format_exc()) if ( event == 0 ): continue if args.verbose: logger.debug( 'Socket has data available, reading messages...' ) msg_count = 0 while ( True ): try: msg = socket.recv( zmq.NOBLOCK ) msg_count += 1 except zmq.error.Again: if args.verbose and msg_count > 0: logger.debug( 'Read %d message(s) from socket.' % msg_count ) break except Exception as e: logger.error( 'Error receiving message: %s' % e ) break else: if len( msg ) > 0: if args.verbose: logger.debug( 'Received message (%d bytes): %s' % (len(msg), repr(msg[:100])) ) y,x = curses.getsyx() msg_str = msg.decode('utf-8', errors='replace') if 'net_port' in msg_str and ' is:' in msg_str and '"net_port"' in msg_str: import re match = re.search(r' is:"([^"]+)"', msg_str) if match: port_str = match.group(1) port_str = port_str.strip() digit_match = re.search(r'(\d+)', port_str) if digit_match and stats_port is None: stats_port = digit_match.group(1) logger.info('>>> Got stats port: %s' % stats_port) if 'zmq_stats_password' in msg_str and ' is:' in msg_str and '"zmq_stats_password"' in msg_str: import re match = re.search(r' is:"([^"]+)"', msg_str) if match and stats_password is None: password_str = match.group(1) password_str = re.sub(r'\^\d', '', password_str) stats_password = password_str.strip() logger.info('>>> Got stats password: %s' % stats_password) if stats_port and stats_password and not stats_connected: try: logger.info('>>> CONNECTING TO STATS PORT <<<') logger.info(' Host: %s:%s' % (args.host.split('//')[1].split(':')[0], stats_port)) logger.info(' Password: %s' % stats_password) host_ip = args.host.split('//')[1].split(':')[0] stats_host = 'tcp://%s:%s' % (host_ip, stats_port) time.sleep(0.5) stats_context = zmq.Context() stats_socket = stats_context.socket(zmq.SUB) logger.info('>>> Stats socket created (SUB type)') if stats_password and stats_password.strip(): logger.info('>>> Setting PLAIN authentication') stats_socket.setsockopt(zmq.PLAIN_USERNAME, b'stats') stats_socket.setsockopt(zmq.PLAIN_PASSWORD, stats_password.encode('utf-8')) stats_socket.setsockopt_string(zmq.ZAP_DOMAIN, 'stats') logger.info('>>> Auth configured') logger.info('>>> Connecting to %s' % stats_host) stats_socket.connect(stats_host) logger.info('>>> Connected') logger.info('>>> Setting ZMQ_SUBSCRIBE to empty (all messages)') stats_socket.setsockopt(zmq.SUBSCRIBE, b'') logger.info('>>> Subscribed') time.sleep(0.5) stats_connected = True logger.info('>>> STATS CONNECTION COMPLETE <<<') logger.info('>>> Waiting for game events...') except Exception as e: logger.error('!!! Stats connection FAILED: %s' % e) import traceback logger.error(traceback.format_exc()) parsed_event = ParseGameEvent(msg_str) if parsed_event: PrintMessageFormatted(output_window, parsed_event) else: PrintMessageFormatted(output_window, msg_str) curses.setsyx(y,x) curses.doupdate() else: if args.verbose: logger.debug( 'Received empty message (possible keepalive or protocol frame)' ) except KeyboardInterrupt: logger.info( 'Received keyboard interrupt, shutting down...' ) except Exception as e: logger.error( 'Fatal error: %s' % e ) import traceback logger.error( traceback.format_exc() ) finally: if stats_socket: stats_socket.close() if stats_context: stats_context.term() if ( __name__ == '__main__' ): curses.wrapper(main) # Version: 1.0.2 - Python 3 ZMQ Rcon Client with Stats Support