590 lines
24 KiB
Python
590 lines
24 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
QLPyCon - Quake Live Python Console
|
|
Main entry point
|
|
"""
|
|
|
|
import argparse
|
|
import uuid
|
|
import logging
|
|
import time
|
|
import re
|
|
import curses
|
|
import zmq
|
|
import signal
|
|
import sys
|
|
|
|
from config import VERSION, DEFAULT_HOST, POLL_TIMEOUT
|
|
from state import GameState
|
|
from network import RconConnection, StatsConnection
|
|
from parser import EventParser
|
|
from formatter import format_message, format_chat_message, format_powerup_message
|
|
from ui import UIManager
|
|
|
|
# Configure logging
|
|
logger = logging.getLogger('main')
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
all_json_logger = logging.getLogger('all_json')
|
|
all_json_logger.setLevel(logging.DEBUG)
|
|
|
|
unknown_json_logger = logging.getLogger('unknown_json')
|
|
unknown_json_logger.setLevel(logging.DEBUG)
|
|
|
|
|
|
def signal_handler(sig, frame):
|
|
"""Handle Ctrl+C for immediate shutdown"""
|
|
# Don't call curses.endwin() here - let curses.wrapper handle it
|
|
sys.exit(0)
|
|
|
|
|
|
def parse_cvar_response(message, game_state, ui):
|
|
"""
|
|
Parse server cvar responses and update state
|
|
Returns True if message should be suppressed from display
|
|
"""
|
|
# Suppress command echo lines
|
|
if message.startswith('zmq RCON command') and 'from' in message:
|
|
suppress_cmds = ['status', 'roundlimit', 'qlx_serverBrandName', 'g_factoryTitle',
|
|
'mapname', 'timelimit', 'fraglimit', 'capturelimit', 'sv_maxclients']
|
|
if any(f': {cmd}' in message for cmd in suppress_cmds):
|
|
return True
|
|
|
|
# Parse cvar responses (format: "cvar_name" is:"value" default:...)
|
|
cvar_match = re.search(r'"([^"]+)"\s+is:"([^"]*)"', message)
|
|
if cvar_match:
|
|
cvar_name = cvar_match.group(1)
|
|
value = cvar_match.group(2)
|
|
|
|
if game_state.server_info.update_from_cvar(cvar_name, value):
|
|
ui.update_server_info(game_state)
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def handle_stats_connection(message, rcon, ui, game_state):
|
|
"""
|
|
Handle stats connection info extraction
|
|
Returns (stats_port, stats_password) or (None, None)
|
|
"""
|
|
stats_port = None
|
|
stats_password = None
|
|
|
|
# Extract stats port
|
|
if 'net_port' in message and ' is:' in message and '"net_port"' in message:
|
|
match = re.search(r' is:"([^"]+)"', message)
|
|
if match:
|
|
port_str = match.group(1).strip()
|
|
digit_match = re.search(r'(\d+)', port_str)
|
|
if digit_match:
|
|
stats_port = digit_match.group(1)
|
|
logger.info(f'Got stats port: {stats_port}')
|
|
|
|
# Extract stats password
|
|
if 'zmq_stats_password' in message and ' is:' in message and '"zmq_stats_password"' in message:
|
|
match = re.search(r' is:"([^"]+)"', message)
|
|
if match:
|
|
password_str = match.group(1)
|
|
password_str = re.sub(r'\^\d', '', password_str) # Strip color codes
|
|
stats_password = password_str.strip()
|
|
logger.info(f'Got stats password: {stats_password}')
|
|
|
|
return stats_port, stats_password
|
|
|
|
def parse_player_events(message, game_state, ui):
|
|
"""
|
|
Parse connect, disconnect, kick, and rename messages
|
|
Returns True if message should be suppressed
|
|
"""
|
|
try:
|
|
# Strip color codes for matching
|
|
from formatter import strip_color_codes
|
|
clean_msg = strip_color_codes(message)
|
|
|
|
# Match connects: "NAME connected"
|
|
connect_match = re.match(r'^(.+?)\s+connected(?:\s+with Steam ID)?', clean_msg)
|
|
if connect_match:
|
|
player_name = message.split(' connected')[0].strip() # Keep color codes
|
|
if game_state.server_info.is_team_mode():
|
|
game_state.player_tracker.update_team(player_name, 'SPECTATOR')
|
|
game_state.player_tracker.add_player(player_name)
|
|
ui.update_server_info(game_state)
|
|
logger.debug(f'Player connected: {player_name}')
|
|
return False
|
|
|
|
# Match disconnects: "NAME disconnected", "NAME was kicked"
|
|
disconnect_patterns = [
|
|
r'^(.+?)\s+disconnected',
|
|
r'^(.+?)\s+was kicked',
|
|
]
|
|
|
|
for pattern in disconnect_patterns:
|
|
match = re.match(pattern, clean_msg)
|
|
if match:
|
|
player_name_clean = match.group(1).strip()
|
|
# Try to find the original name with color codes
|
|
original_match = re.match(pattern, message)
|
|
player_name = original_match.group(1).strip() if original_match else player_name_clean
|
|
|
|
game_state.player_tracker.remove_player(player_name)
|
|
game_state.player_tracker.remove_player(player_name_clean) # Try both
|
|
ui.update_server_info(game_state)
|
|
logger.debug(f'Player disconnected: {player_name}')
|
|
return False
|
|
|
|
# Match renames: "OldName renamed to NewName"
|
|
rename_match = re.match(r'^(.+?)\s+renamed to\s+(.+?)$', clean_msg)
|
|
if rename_match:
|
|
old_name_clean = rename_match.group(1).strip()
|
|
new_name_clean = rename_match.group(2).strip()
|
|
|
|
# Get names with color codes
|
|
original_match = re.match(r'^(.+?)\s+renamed to\s+(.+?)$', message)
|
|
old_name = original_match.group(1).strip() if original_match else old_name_clean
|
|
new_name = original_match.group(2).strip() if original_match else new_name_clean
|
|
|
|
game_state.player_tracker.rename_player(old_name, new_name)
|
|
ui.update_server_info(game_state)
|
|
logger.debug(f'Player renamed: {old_name} -> {new_name}')
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f'Error in parse_player_events: {e}')
|
|
|
|
return False
|
|
|
|
def main_loop(screen):
|
|
return False
|
|
"""Main application loop"""
|
|
|
|
# Setup signal handler for Ctrl+C
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
# Parse arguments
|
|
parser = argparse.ArgumentParser(description='Verbose QuakeLive server statistics')
|
|
parser.add_argument('--host', default=DEFAULT_HOST, help=f'ZMQ URI to connect to. Defaults to {DEFAULT_HOST}')
|
|
parser.add_argument('--password', required=False, help='RCON password')
|
|
parser.add_argument('--identity', default=uuid.uuid1().hex, help='Socket identity (random UUID by default)')
|
|
parser.add_argument('-v', '--verbose', action='count', default=0, help='Increase verbosity (-v INFO, -vv DEBUG)')
|
|
parser.add_argument('--unknown-log', default='unknown_events.log', help='File to log unknown JSON events')
|
|
parser.add_argument('-j', '--json', dest='json_log', default=None, help='File to log all JSON events')
|
|
args = parser.parse_args()
|
|
|
|
# Set logging level
|
|
if args.verbose == 0:
|
|
logger.setLevel(logging.WARNING)
|
|
elif args.verbose == 1:
|
|
logger.setLevel(logging.INFO)
|
|
else:
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# Setup file logging for unknown events
|
|
unknown_handler = logging.FileHandler(args.unknown_log, mode='a')
|
|
unknown_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S')
|
|
unknown_handler.setFormatter(unknown_formatter)
|
|
unknown_json_logger.addHandler(unknown_handler)
|
|
unknown_json_logger.propagate = False
|
|
|
|
# Initialize components
|
|
ui = UIManager(screen, args.host)
|
|
game_state = GameState()
|
|
|
|
# Setup logging to output window
|
|
log_handler = ui.setup_logging()
|
|
logger.addHandler(log_handler)
|
|
|
|
# Setup input queue
|
|
input_queue = ui.setup_input_queue()
|
|
|
|
# Display startup messages
|
|
ui.print_message(f"*** QL pyCon Version {VERSION} starting ***\n")
|
|
ui.print_message(f"zmq python bindings {zmq.__version__}, libzmq version {zmq.zmq_version()}\n")
|
|
|
|
# Initialize network connections
|
|
rcon = RconConnection(args.host, args.password, args.identity)
|
|
rcon.connect()
|
|
|
|
stats_conn = None
|
|
stats_port = None
|
|
stats_password = None
|
|
stats_check_counter = 0
|
|
|
|
# Shutdown flag
|
|
shutdown = False
|
|
|
|
# Setup JSON logging if requested
|
|
json_logger = None
|
|
if args.json_log:
|
|
json_handler = logging.FileHandler(args.json_log, mode='a')
|
|
json_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S')
|
|
json_handler.setFormatter(json_formatter)
|
|
all_json_logger.addHandler(json_handler)
|
|
all_json_logger.propagate = False
|
|
json_logger = all_json_logger
|
|
|
|
# Create event parser
|
|
event_parser = EventParser(game_state, json_logger, unknown_json_logger)
|
|
|
|
# Main event loop
|
|
while not shutdown:
|
|
# Poll RCON socket
|
|
event = rcon.poll(POLL_TIMEOUT)
|
|
|
|
# Check monitor for connection events
|
|
monitor_event = rcon.check_monitor()
|
|
if monitor_event and monitor_event[0] == zmq.EVENT_CONNECTED:
|
|
ui.print_message("Connected to server\n")
|
|
rcon.send_command(b'register')
|
|
logger.info('Registration message sent')
|
|
|
|
ui.print_message("Requesting connection info...\n")
|
|
rcon.send_command(b'zmq_stats_password')
|
|
rcon.send_command(b'net_port')
|
|
|
|
# Handle user input
|
|
while not input_queue.empty():
|
|
command = input_queue.get()
|
|
logger.info(f'Sending command: {repr(command.strip())}')
|
|
|
|
# Display command with timestamp
|
|
timestamp = time.strftime('%H:%M:%S')
|
|
ui.print_message(f"^5[^7{timestamp}^5] >>> {command.strip()}^7\n")
|
|
|
|
rcon.send_command(command)
|
|
|
|
# Poll stats stream if connected
|
|
if stats_conn and stats_conn.connected:
|
|
stats_check_counter += 1
|
|
|
|
if stats_check_counter % 100 == 0:
|
|
logger.debug(f'Stats polling active (check #{stats_check_counter // 100})')
|
|
|
|
stats_msg = stats_conn.recv_message()
|
|
if stats_msg:
|
|
logger.info(f'Stats event received ({len(stats_msg)} bytes)')
|
|
|
|
# Parse game event
|
|
parsed = event_parser.parse_event(stats_msg)
|
|
if parsed:
|
|
# Format with timestamp before displaying
|
|
formatted_msg, attributes = format_message(parsed)
|
|
ui.print_message(formatted_msg)
|
|
ui.update_server_info(game_state)
|
|
|
|
# Process RCON messages
|
|
if event > 0:
|
|
logger.debug('Socket has data available')
|
|
msg_count = 0
|
|
|
|
while True:
|
|
message = rcon.recv_message()
|
|
if message is None:
|
|
if msg_count > 0:
|
|
logger.debug(f'Read {msg_count} message(s)')
|
|
break
|
|
|
|
msg_count += 1
|
|
|
|
if len(message) == 0:
|
|
logger.debug('Received empty message (keepalive)')
|
|
continue
|
|
|
|
logger.debug(f'Received message ({len(message)} bytes): {repr(message[:100])}')
|
|
|
|
# Try to parse as cvar response
|
|
if parse_cvar_response(message, game_state, ui):
|
|
logger.debug('Suppressed cvar response')
|
|
continue
|
|
|
|
# Check for player connect/disconnect/rename events
|
|
parse_player_events(message, game_state, ui)
|
|
|
|
# Check for stats connection info
|
|
port, password = handle_stats_connection(message, rcon, ui, game_state)
|
|
if port:
|
|
stats_port = port
|
|
if password:
|
|
stats_password = password
|
|
|
|
# Connect to stats if we have both credentials
|
|
if stats_port and stats_password and stats_conn is None:
|
|
try:
|
|
ui.print_message("Connecting to stats stream...\n")
|
|
host_ip = args.host.split('//')[1].split(':')[0]
|
|
|
|
stats_conn = StatsConnection(host_ip, stats_port, stats_password)
|
|
stats_conn.connect()
|
|
|
|
ui.print_message("Stats stream connected - ready for game events\n")
|
|
|
|
# Request initial server info
|
|
logger.info('Sending initial server info queries')
|
|
rcon.send_command(b'qlx_serverBrandName')
|
|
rcon.send_command(b'g_factoryTitle')
|
|
rcon.send_command(b'mapname')
|
|
rcon.send_command(b'timelimit')
|
|
rcon.send_command(b'fraglimit')
|
|
rcon.send_command(b'roundlimit')
|
|
rcon.send_command(b'capturelimit')
|
|
rcon.send_command(b'sv_maxclients')
|
|
|
|
if args.json_log:
|
|
ui.print_message(f"*** JSON capture enabled: {args.json_log} ***\n")
|
|
|
|
except Exception as e:
|
|
timestamp = time.strftime('%H:%M:%S')
|
|
ui.print_message(f"^1[^7{timestamp}^1] Error: Stats connection failed: {e}^7\n")
|
|
logger.error(f'Stats connection failed: {e}')
|
|
|
|
# Try to parse as game event
|
|
parsed_event = event_parser.parse_event(message)
|
|
if parsed_event:
|
|
ui.print_message(parsed_event)
|
|
continue
|
|
|
|
# Check if it looks like JSON but wasn't parsed
|
|
stripped = message.strip()
|
|
if stripped and stripped[0] in ('{', '['):
|
|
logger.debug('Unparsed JSON event')
|
|
continue
|
|
|
|
# Try powerup message formatting
|
|
powerup_msg = format_powerup_message(message, game_state.player_tracker)
|
|
if powerup_msg:
|
|
ui.print_message(powerup_msg)
|
|
continue
|
|
|
|
# Filter bot debug messages in default mode
|
|
is_bot_debug = (' entered ' in message and
|
|
any(x in message for x in [' seek ', ' battle ', ' chase', ' fight']))
|
|
if is_bot_debug and args.verbose == 0:
|
|
logger.debug(f'Filtered bot debug: {message[:50]}')
|
|
continue
|
|
|
|
# Check if it's a chat message
|
|
if ':' in message and not message.startswith(('print', 'broadcast', 'zmq')):
|
|
message = format_chat_message(message, game_state.player_tracker)
|
|
|
|
# Format and display message
|
|
formatted_msg, attributes = format_message(message)
|
|
ui.print_message(formatted_msg)
|
|
|
|
def main_loop(screen):
|
|
"""Main application loop"""
|
|
|
|
# Setup signal handler for Ctrl+C
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
# Parse arguments
|
|
parser = argparse.ArgumentParser(description='Verbose QuakeLive server statistics')
|
|
parser.add_argument('--host', default=DEFAULT_HOST, help=f'ZMQ URI to connect to. Defaults to {DEFAULT_HOST}')
|
|
parser.add_argument('--password', required=False, help='RCON password')
|
|
parser.add_argument('--identity', default=uuid.uuid1().hex, help='Socket identity (random UUID by default)')
|
|
parser.add_argument('-v', '--verbose', action='count', default=0, help='Increase verbosity (-v INFO, -vv DEBUG)')
|
|
parser.add_argument('--unknown-log', default='unknown_events.log', help='File to log unknown JSON events')
|
|
parser.add_argument('-j', '--json', dest='json_log', default=None, help='File to log all JSON events')
|
|
args = parser.parse_args()
|
|
|
|
# Set logging level
|
|
if args.verbose == 0:
|
|
logger.setLevel(logging.WARNING)
|
|
elif args.verbose == 1:
|
|
logger.setLevel(logging.INFO)
|
|
else:
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# Setup file logging for unknown events
|
|
unknown_handler = logging.FileHandler(args.unknown_log, mode='a')
|
|
unknown_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S')
|
|
unknown_handler.setFormatter(unknown_formatter)
|
|
unknown_json_logger.addHandler(unknown_handler)
|
|
unknown_json_logger.propagate = False
|
|
|
|
# Initialize components
|
|
ui = UIManager(screen, args.host)
|
|
game_state = GameState()
|
|
|
|
# Setup logging to output window
|
|
log_handler = ui.setup_logging()
|
|
logger.addHandler(log_handler)
|
|
|
|
# Setup input queue
|
|
input_queue = ui.setup_input_queue()
|
|
|
|
# Display startup messages
|
|
ui.print_message(f"*** QL pyCon Version {VERSION} starting ***\n")
|
|
ui.print_message(f"zmq python bindings {zmq.__version__}, libzmq version {zmq.zmq_version()}\n")
|
|
|
|
# Initialize network connections
|
|
rcon = RconConnection(args.host, args.password, args.identity)
|
|
rcon.connect()
|
|
|
|
stats_conn = None
|
|
stats_port = None
|
|
stats_password = None
|
|
stats_check_counter = 0
|
|
|
|
# Shutdown flag
|
|
shutdown = False
|
|
|
|
# Setup JSON logging if requested
|
|
json_logger = None
|
|
if args.json_log:
|
|
json_handler = logging.FileHandler(args.json_log, mode='a')
|
|
json_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S')
|
|
json_handler.setFormatter(json_formatter)
|
|
all_json_logger.addHandler(json_handler)
|
|
all_json_logger.propagate = False
|
|
json_logger = all_json_logger
|
|
|
|
# Create event parser
|
|
event_parser = EventParser(game_state, json_logger, unknown_json_logger)
|
|
|
|
# Main event loop
|
|
while not shutdown:
|
|
# Poll RCON socket
|
|
event = rcon.poll(POLL_TIMEOUT)
|
|
|
|
# Check monitor for connection events
|
|
monitor_event = rcon.check_monitor()
|
|
if monitor_event and monitor_event[0] == zmq.EVENT_CONNECTED:
|
|
ui.print_message("Connected to server\n")
|
|
rcon.send_command(b'register')
|
|
logger.info('Registration message sent')
|
|
|
|
ui.print_message("Requesting connection info...\n")
|
|
rcon.send_command(b'zmq_stats_password')
|
|
rcon.send_command(b'net_port')
|
|
|
|
# Handle user input
|
|
while not input_queue.empty():
|
|
command = input_queue.get()
|
|
logger.info(f'Sending command: {repr(command.strip())}')
|
|
|
|
# Display command with timestamp
|
|
timestamp = time.strftime('%H:%M:%S')
|
|
ui.print_message(f"^5[^7{timestamp}^5] >>> {command.strip()}^7\n")
|
|
|
|
rcon.send_command(command)
|
|
|
|
# Poll stats stream if connected
|
|
if stats_conn and stats_conn.connected:
|
|
stats_check_counter += 1
|
|
|
|
if stats_check_counter % 100 == 0:
|
|
logger.debug(f'Stats polling active (check #{stats_check_counter // 100})')
|
|
|
|
stats_msg = stats_conn.recv_message()
|
|
if stats_msg:
|
|
logger.info(f'Stats event received ({len(stats_msg)} bytes)')
|
|
|
|
# Parse game event
|
|
parsed = event_parser.parse_event(stats_msg)
|
|
if parsed:
|
|
# Format with timestamp before displaying
|
|
formatted_msg, attributes = format_message(parsed)
|
|
ui.print_message(formatted_msg)
|
|
ui.update_server_info(game_state)
|
|
|
|
# Process RCON messages
|
|
if event > 0:
|
|
logger.debug('Socket has data available')
|
|
msg_count = 0
|
|
|
|
while True:
|
|
message = rcon.recv_message()
|
|
if message is None:
|
|
if msg_count > 0:
|
|
logger.debug(f'Read {msg_count} message(s)')
|
|
break
|
|
|
|
msg_count += 1
|
|
|
|
if len(message) == 0:
|
|
logger.debug('Received empty message (keepalive)')
|
|
continue
|
|
|
|
logger.debug(f'Received message ({len(message)} bytes): {repr(message[:100])}')
|
|
|
|
# Try to parse as cvar response
|
|
if parse_cvar_response(message, game_state, ui):
|
|
logger.debug('Suppressed cvar response')
|
|
continue
|
|
|
|
# Check for player connect/disconnect/rename events
|
|
parse_player_events(message, game_state, ui)
|
|
|
|
# Check for stats connection info
|
|
port, password = handle_stats_connection(message, rcon, ui, game_state)
|
|
if port:
|
|
stats_port = port
|
|
if password:
|
|
stats_password = password
|
|
|
|
# Connect to stats if we have both credentials
|
|
if stats_port and stats_password and stats_conn is None:
|
|
try:
|
|
ui.print_message("Connecting to stats stream...\n")
|
|
host_ip = args.host.split('//')[1].split(':')[0]
|
|
|
|
stats_conn = StatsConnection(host_ip, stats_port, stats_password)
|
|
stats_conn.connect()
|
|
|
|
ui.print_message("Stats stream connected - ready for game events\n")
|
|
|
|
# Request initial server info
|
|
logger.info('Sending initial server info queries')
|
|
rcon.send_command(b'qlx_serverBrandName')
|
|
rcon.send_command(b'g_factoryTitle')
|
|
rcon.send_command(b'mapname')
|
|
rcon.send_command(b'timelimit')
|
|
rcon.send_command(b'fraglimit')
|
|
rcon.send_command(b'roundlimit')
|
|
rcon.send_command(b'capturelimit')
|
|
rcon.send_command(b'sv_maxclients')
|
|
|
|
if args.json_log:
|
|
ui.print_message(f"*** JSON capture enabled: {args.json_log} ***\n")
|
|
|
|
except Exception as e:
|
|
timestamp = time.strftime('%H:%M:%S')
|
|
ui.print_message(f"^1[^7{timestamp}^1] Error: Stats connection failed: {e}^7\n")
|
|
logger.error(f'Stats connection failed: {e}')
|
|
|
|
# Try to parse as game event
|
|
parsed_event = event_parser.parse_event(message)
|
|
if parsed_event:
|
|
ui.print_message(parsed_event)
|
|
continue
|
|
|
|
# Check if it looks like JSON but wasn't parsed
|
|
stripped = message.strip()
|
|
if stripped and stripped[0] in ('{', '['):
|
|
logger.debug('Unparsed JSON event')
|
|
continue
|
|
|
|
# Try powerup message formatting
|
|
powerup_msg = format_powerup_message(message, game_state.player_tracker)
|
|
if powerup_msg:
|
|
ui.print_message(powerup_msg)
|
|
continue
|
|
|
|
# Filter bot debug messages in default mode
|
|
is_bot_debug = (' entered ' in message and
|
|
any(x in message for x in [' seek ', ' battle ', ' chase', ' fight']))
|
|
if is_bot_debug and args.verbose == 0:
|
|
logger.debug(f'Filtered bot debug: {message[:50]}')
|
|
continue
|
|
|
|
# Check if it's a chat message
|
|
if ':' in message and not message.startswith(('print', 'broadcast', 'zmq')):
|
|
message = format_chat_message(message, game_state.player_tracker)
|
|
|
|
# Format and display message
|
|
formatted_msg, attributes = format_message(message)
|
|
ui.print_message(formatted_msg)
|
|
|
|
if __name__ == '__main__':
|
|
curses.wrapper(main_loop)
|