qlpycon/main.py
2025-12-30 00:50:24 +01:00

490 lines
20 KiB
Python

#!/usr/bin/env python3
"""
QLPyCon - Quake Live Python Console
Main entry point
"""
import argparse
import uuid
import logging
import time
import re
import curses
import zmq
import signal
import sys
from config import VERSION, DEFAULT_HOST, POLL_TIMEOUT
from state import GameState
from network import RconConnection, StatsConnection
from parser import EventParser
from formatter import format_message, format_chat_message, format_powerup_message
from ui import UIManager
# Configure logging
logger = logging.getLogger('main')
logger.setLevel(logging.DEBUG)
all_json_logger = logging.getLogger('all_json')
all_json_logger.setLevel(logging.DEBUG)
unknown_json_logger = logging.getLogger('unknown_json')
unknown_json_logger.setLevel(logging.DEBUG)
# Global flag for quit confirmation
quit_confirm_time = None
def signal_handler(sig, frame):
"""Handle Ctrl+C with confirmation"""
global quit_confirm_time
import time
current_time = time.time()
if quit_confirm_time is None or (current_time - quit_confirm_time) > 3:
# First Ctrl-C or timeout expired
logger.warning("^1^8Press Ctrl-C again within 3 seconds to quit^0")
quit_confirm_time = current_time
else:
# Second Ctrl-C within 3 seconds
logger.warning("^1^8Quittin'^0")
sys.exit(0)
def parse_cvar_response(message, game_state, ui):
"""
Parse server cvar responses and update state
Returns True if message should be suppressed from display
"""
# Suppress command echo lines
if message.startswith('zmq RCON command') and 'from' in message:
suppress_cmds = ['status', 'roundlimit', 'qlx_serverBrandName', 'g_factoryTitle',
'mapname', 'timelimit', 'fraglimit', 'capturelimit', 'sv_maxclients']
if any(f': {cmd}' in message for cmd in suppress_cmds):
return True
# Parse cvar responses (format: "cvar_name" is:"value" default:...)
cvar_match = re.search(r'"([^"]+)"\s+is:"([^"]*)"', message)
if cvar_match:
cvar_name = cvar_match.group(1)
value = cvar_match.group(2)
if game_state.server_info.update_from_cvar(cvar_name, value):
ui.update_server_info(game_state)
return True
return False
def handle_stats_connection(message, rcon, ui, game_state):
"""
Handle stats connection info extraction
Returns (stats_port, stats_password) or (None, None)
"""
stats_port = None
stats_password = None
# Extract stats port
if 'net_port' in message and ' is:' in message and '"net_port"' in message:
match = re.search(r' is:"([^"]+)"', message)
if match:
port_str = match.group(1).strip()
digit_match = re.search(r'(\d+)', port_str)
if digit_match:
stats_port = digit_match.group(1)
logger.info(f'Got stats port: {stats_port}')
# Extract stats password
if 'zmq_stats_password' in message and ' is:' in message and '"zmq_stats_password"' in message:
match = re.search(r' is:"([^"]+)"', message)
if match:
password_str = match.group(1)
password_str = re.sub(r'\^\d', '', password_str) # Strip color codes
stats_password = password_str.strip()
logger.info(f'Got stats password: {stats_password}')
return stats_port, stats_password
def parse_player_events(message, game_state, ui):
"""
Parse connect, disconnect, kick, and rename messages
Returns True if message should be suppressed
"""
try:
msg = message
# Strip broadcast: print "..." wrapper with regex
broadcast_match = re.match(r'^broadcast:\s*print\s*"(.+?)(?:\\n)?"\s*$', msg)
if broadcast_match:
msg = broadcast_match.group(1)
# Strip timestamp: [HH:MM:SS] or ^3[^7HH:MM:SS^3]^7
msg = re.sub(r'\^\d\[\^\d[0-9:]+\^\d\]\^\d\s*', '', msg)
msg = re.sub(r'\[[0-9:]+\]\s*', '', msg)
msg = msg.strip()
if not msg:
return False
logger.debug(f'parse_player_events: {repr(msg)}')
# Strip color codes for matching
from formatter import strip_color_codes
clean_msg = strip_color_codes(msg)
# Match connects: "NAME connected" or "NAME connected with Steam ID"
connect_match = re.match(r'^(.+?)\s+connected', clean_msg)
if connect_match:
player_name_match = re.match(r'^(.+?)\s+connected', msg)
player_name = player_name_match.group(1).strip() if player_name_match else connect_match.group(1).strip()
player_name = re.sub(r'\^\d+$', '', player_name)
logger.info(f'CONNECT: {repr(player_name)}')
game_state.player_tracker.update_team(player_name, 'SPECTATOR')
game_state.player_tracker.add_player(player_name)
ui.update_server_info(game_state)
# Only print if this is NOT the Steam ID line
if 'Steam ID' not in message:
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^3[^7{timestamp}^3]^7 ^8{player_name} ^9^2connected\n")
return True
# Regular disconnect
disconnect_match = re.match(r'^(.+?)\s+disconnected', clean_msg)
if disconnect_match:
original_match = re.match(r'^(.+?)\s+disconnected', msg)
player_name = original_match.group(1).strip() if original_match else disconnect_match.group(1).strip()
player_name = re.sub(r'\^\d+$', '', player_name)
player_name = re.sub(r'^\^\d+', '', player_name)
logger.info(f'DISCONNECT: {repr(player_name)}')
game_state.player_tracker.remove_player(player_name)
ui.update_server_info(game_state)
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^3[^7{timestamp}^3]^7 ^8{player_name} ^9^1disconnected\n")
return True
# Kick
kick_match = re.match(r'^(.+?)\s+was kicked', clean_msg)
if kick_match:
original_match = re.match(r'^(.+?)\s+was kicked', msg)
player_name = original_match.group(1).strip() if original_match else kick_match.group(1).strip()
player_name = re.sub(r'\^\d+$', '', player_name)
player_name = re.sub(r'^\^\d+', '', player_name)
logger.info(f'KICK: {repr(player_name)}')
game_state.player_tracker.remove_player(player_name)
ui.update_server_info(game_state)
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^3[^7{timestamp}^3]^7 ^8{player_name} ^1was kicked\n")
return True
# Inactivity
inactivity_match = re.match(r'^(.+?)\s+Dropped due to inactivity', clean_msg)
if inactivity_match:
original_match = re.match(r'^(.+?)\s+Dropped due to inactivity', msg)
player_name = original_match.group(1).strip() if original_match else inactivity_match.group(1).strip()
player_name = re.sub(r'\^\d+$', '', player_name)
player_name = re.sub(r'^\^\d+', '', player_name)
logger.info(f'INACTIVITY DROP: {repr(player_name)}')
game_state.player_tracker.remove_player(player_name)
ui.update_server_info(game_state)
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^3[^7{timestamp}^3]^7 ^8{player_name}^0 ^3dropped due to inactivity^7\n")
return True
# Match renames: "OldName renamed to NewName"
rename_match = re.match(r'^(.+?)\s+renamed to\s+(.+?)$', clean_msg)
if rename_match:
# Extract from original message
original_match = re.match(r'^(.+?)\s+renamed to\s+(.+?)$', msg)
if original_match:
old_name = original_match.group(1).strip()
new_name = original_match.group(2).strip()
else:
old_name = rename_match.group(1).strip()
new_name = rename_match.group(2).strip()
# Remove trailing color codes from both names
old_name = re.sub(r'\^\d+$', '', old_name)
new_name = re.sub(r'^\^\d+', '', new_name) # Remove leading ^7 from new name
new_name = re.sub(r'\^\d+$', '', new_name) # Remove trailing too
old_name = old_name.rstrip('\n\r') # Remove trailing newline
new_name = new_name.rstrip('\n\r') # Remove trailing newline
logger.info(f'RENAME: {repr(old_name)} -> {repr(new_name)}')
game_state.player_tracker.rename_player(old_name, new_name)
ui.update_server_info(game_state)
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^3[^7{timestamp}^3]^7 ^8{old_name} ^6renamed to^7 ^8{new_name}\n")
return True
except Exception as e:
logger.error(f'Error in parse_player_events: {e}')
return False
def main_loop(screen):
"""Main application loop"""
# Setup signal handler for Ctrl+C with confirmation
signal.signal(signal.SIGINT, signal_handler)
# Parse arguments
parser = argparse.ArgumentParser(description='Verbose QuakeLive server statistics')
parser.add_argument('--host', default=DEFAULT_HOST, help=f'ZMQ URI to connect to. Defaults to {DEFAULT_HOST}')
parser.add_argument('--password', required=False, help='RCON password')
parser.add_argument('--identity', default=uuid.uuid1().hex, help='Socket identity (random UUID by default)')
parser.add_argument('-v', '--verbose', action='count', default=0, help='Increase verbosity (-v INFO, -vv DEBUG)')
parser.add_argument('--unknown-log', default='unknown_events.log', help='File to log unknown JSON events')
parser.add_argument('-j', '--json', dest='json_log', default=None, help='File to log all JSON events')
args = parser.parse_args()
# Set logging level
if args.verbose == 0:
logger.setLevel(logging.WARNING)
elif args.verbose == 1:
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.DEBUG)
# Setup file logging for unknown events
unknown_handler = logging.FileHandler(args.unknown_log, mode='a')
unknown_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S')
unknown_handler.setFormatter(unknown_formatter)
unknown_json_logger.addHandler(unknown_handler)
unknown_json_logger.propagate = False
# Initialize components
ui = UIManager(screen, args.host)
game_state = GameState()
# Setup logging to output window
log_handler = ui.setup_logging()
logger.addHandler(log_handler)
# Setup input queue
input_queue = ui.setup_input_queue()
# Display startup messages
ui.print_message(f"*** QL pyCon Version {VERSION} starting ***\n")
ui.print_message(f"zmq python bindings {zmq.__version__}, libzmq version {zmq.zmq_version()}\n")
# Initialize network connections
rcon = RconConnection(args.host, args.password, args.identity)
rcon.connect()
stats_conn = None
stats_port = None
stats_password = None
stats_check_counter = 0
# Shutdown flag
shutdown = False
# Setup JSON logging if requested
json_logger = None
if args.json_log:
json_handler = logging.FileHandler(args.json_log, mode='a')
json_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S')
json_handler.setFormatter(json_formatter)
all_json_logger.addHandler(json_handler)
all_json_logger.propagate = False
json_logger = all_json_logger
# Create event parser
event_parser = EventParser(game_state, json_logger, unknown_json_logger)
# Main event loop
while not shutdown:
# Poll RCON socket
event = rcon.poll(POLL_TIMEOUT)
# Check monitor for connection events
monitor_event = rcon.check_monitor()
if monitor_event and monitor_event[0] == zmq.EVENT_CONNECTED:
ui.print_message("Connected to server\n")
rcon.send_command(b'register')
logger.info('Registration message sent')
ui.print_message("Requesting connection info...\n")
rcon.send_command(b'zmq_stats_password')
rcon.send_command(b'net_port')
# Handle user input
while not input_queue.empty():
command = input_queue.get()
logger.info(f'Sending command: {repr(command.strip())}')
# Display command with timestamp
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^5[^7{timestamp}^5] >>> {command.strip()}^7\n")
rcon.send_command(command)
# Poll stats stream if connected
if stats_conn and stats_conn.connected:
stats_check_counter += 1
if stats_check_counter % 100 == 0:
logger.debug(f'Stats polling active (check #{stats_check_counter // 100})')
stats_msg = stats_conn.recv_message()
if stats_msg:
logger.info(f'Stats event received ({len(stats_msg)} bytes)')
# Parse game event
parsed = event_parser.parse_event(stats_msg)
if parsed:
# Format with timestamp before displaying
formatted_msg, attributes = format_message(parsed)
ui.print_message(formatted_msg)
ui.update_server_info(game_state)
# Check if we need to revive players
if game_state.server_info.gametype == 'Clan Arena':
# CA: revive all players 3s after round end
if game_state.server_info.round_end_time:
if time.time() - game_state.server_info.round_end_time >= 3.0:
game_state.server_info.dead_players.clear()
game_state.server_info.round_end_time = None
ui.update_server_info(game_state)
else:
# Other modes: revive individual players 3s after death
current_time = time.time()
players_to_revive = [
name for name, death_time in game_state.server_info.dead_players.items()
if current_time - death_time >= 3.0
]
if players_to_revive:
for name in players_to_revive:
del game_state.server_info.dead_players[name]
ui.update_server_info(game_state)
# Process RCON messages
if event > 0:
logger.debug('Socket has data available')
msg_count = 0
while True:
message = rcon.recv_message()
if message is None:
if msg_count > 0:
logger.debug(f'Read {msg_count} message(s)')
break
msg_count += 1
if len(message) == 0:
logger.debug('Received empty message (keepalive)')
continue
logger.debug(f'Received message ({len(message)} bytes): {repr(message[:100])}')
# Check for player connect/disconnect/rename events
if parse_player_events(message, game_state, ui):
continue
if '------- Game Initialization -------' in message or 'Game Initialization' in message:
logger.info('Game initialization detected - refreshing server info')
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^3[^7{timestamp}^3] ^8^3Game initialized - Refreshing server info^0^7\n")
rcon.send_command(b'qlx_serverBrandName')
rcon.send_command(b'g_factoryTitle')
rcon.send_command(b'mapname')
rcon.send_command(b'timelimit')
rcon.send_command(b'fraglimit')
rcon.send_command(b'roundlimit')
rcon.send_command(b'capturelimit')
rcon.send_command(b'sv_maxclients')
# Clear player list since map changed
game_state.server_info.players = []
game_state.player_tracker.player_teams = {}
ui.update_server_info(game_state)
# Try to parse as cvar response
if parse_cvar_response(message, game_state, ui):
logger.debug('Suppressed cvar response')
continue
# Check for stats connection info
port, password = handle_stats_connection(message, rcon, ui, game_state)
if port:
stats_port = port
if password:
stats_password = password
# Connect to stats if we have both credentials
if stats_port and stats_password and stats_conn is None:
try:
ui.print_message("Connecting to stats stream...\n")
host_ip = args.host.split('//')[1].split(':')[0]
stats_conn = StatsConnection(host_ip, stats_port, stats_password)
stats_conn.connect()
ui.print_message("Stats stream connected - ready for game events\n")
# Request initial server info
logger.info('Sending initial server info queries')
rcon.send_command(b'qlx_serverBrandName')
rcon.send_command(b'g_factoryTitle')
rcon.send_command(b'mapname')
rcon.send_command(b'timelimit')
rcon.send_command(b'fraglimit')
rcon.send_command(b'roundlimit')
rcon.send_command(b'capturelimit')
rcon.send_command(b'sv_maxclients')
if args.json_log:
ui.print_message(f"*** JSON capture enabled: {args.json_log} ***\n")
except Exception as e:
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^1[^7{timestamp}^1] Error: Stats connection failed: {e}^7\n")
logger.error(f'Stats connection failed: {e}')
# Try to parse as game event
parsed_event = event_parser.parse_event(message)
if parsed_event:
ui.print_message(parsed_event)
continue
# Check if it looks like JSON but wasn't parsed
stripped = message.strip()
if stripped and stripped[0] in ('{', '['):
logger.debug('Unparsed JSON event')
continue
# Try powerup message formatting
powerup_msg = format_powerup_message(message, game_state.player_tracker)
if powerup_msg:
ui.print_message(powerup_msg)
continue
# Filter bot debug messages in default mode
is_bot_debug = (' entered ' in message and
any(x in message for x in [' seek ', ' battle ', ' chase', ' fight']))
if is_bot_debug and args.verbose == 0:
logger.debug(f'Filtered bot debug: {message[:50]}')
continue
# Check if it's a chat message
if ':' in message and not message.startswith(('print', 'broadcast', 'zmq')):
message = format_chat_message(message, game_state.player_tracker)
# Format and display message
formatted_msg, attributes = format_message(message)
ui.print_message(formatted_msg)
if __name__ == '__main__':
curses.wrapper(main_loop)