From c1e5a28b41a21b1d5adca37103d7ff44b26eefc8 Mon Sep 17 00:00:00 2001 From: xbl Date: Tue, 23 Dec 2025 15:41:10 +0100 Subject: [PATCH] saved --- formatter.py | 12 +-- main.py | 218 +-------------------------------------------------- parser.py | 16 ++-- ui.py | 17 ++-- 4 files changed, 24 insertions(+), 239 deletions(-) diff --git a/formatter.py b/formatter.py index 37ed5ec..ca109b5 100644 --- a/formatter.py +++ b/formatter.py @@ -84,7 +84,7 @@ def format_message(message, add_timestamp=True): # Handle print messages if message[:7] == "print \"": message = message[7:-2] + "\n" - + # Add timestamp if requested and appropriate if add_timestamp and should_add_timestamp(message): timestamp = time.strftime('%H:%M:%S') @@ -137,7 +137,7 @@ def format_chat_message(message, player_tracker): player_name = strip_color_codes(name_match.group(1).strip()) team_prefix = get_team_prefix(player_name, player_tracker) location_clean = strip_color_codes(location_part) - return f"{team_prefix}{player_part} ^3{location_clean}^7:^5{message_part}" + return f"{team_prefix}^0{player_part}^9 ^3{location_clean}^7:^5{message_part}" # Team chat without location: (PlayerName): message colon_match = re.match(r'^(\([^)]+\)):(\s*.*)', clean_msg) @@ -149,7 +149,7 @@ def format_chat_message(message, player_tracker): if name_match: player_name = strip_color_codes(name_match.group(1).strip()) team_prefix = get_team_prefix(player_name, player_tracker) - return f"{team_prefix}{player_part}^5{message_part}\n" + return f"{team_prefix}^0{player_part}^9^5{message_part}\n" # Regular chat: PlayerName: message parts = clean_msg.split(':', 1) @@ -160,7 +160,7 @@ def format_chat_message(message, player_tracker): # Preserve original color-coded name original_parts = message.replace(chr(25), '').split(':', 1) if len(original_parts) == 2: - return f"{team_prefix}{original_parts[0]}:^2{original_parts[1]}" + return f"{team_prefix}^0{original_parts[0]}^9:^2{original_parts[1]}" return message @@ -185,7 +185,7 @@ def format_powerup_message(message, player_tracker): team_prefix = get_team_prefix(player_clean, player_tracker) colored_powerup = POWERUP_COLORS.get(powerup_name, f'^6{powerup_name}^7') - return f"{team_prefix}{player_name} ^7got the {colored_powerup}!\n" + return f"{team_prefix}^0{player_name}^9 ^7got the {colored_powerup}!\n" # Powerup carrier kill: "PlayerName killed the PowerupName carrier!" carrier_match = re.match(r'^(.+?)\s+killed the\s+(.+?)\s+carrier!', message) @@ -197,6 +197,6 @@ def format_powerup_message(message, player_tracker): team_prefix = get_team_prefix(player_clean, player_tracker) colored_powerup = POWERUP_COLORS.get(powerup_name, f'^6{powerup_name}^7') - return f"{team_prefix}{player_name} ^7killed the {colored_powerup} ^7carrier!\n" + return f"{team_prefix}^0{player_name}^9 ^7killed the {colored_powerup} ^7carrier!\n" return None diff --git a/main.py b/main.py index 62b2025..f1115de 100644 --- a/main.py +++ b/main.py @@ -154,222 +154,6 @@ def parse_player_events(message, game_state, ui): return False -def main_loop(screen): - return False - """Main application loop""" - - # Setup signal handler for Ctrl+C - signal.signal(signal.SIGINT, signal_handler) - - # Parse arguments - parser = argparse.ArgumentParser(description='Verbose QuakeLive server statistics') - parser.add_argument('--host', default=DEFAULT_HOST, help=f'ZMQ URI to connect to. Defaults to {DEFAULT_HOST}') - parser.add_argument('--password', required=False, help='RCON password') - parser.add_argument('--identity', default=uuid.uuid1().hex, help='Socket identity (random UUID by default)') - parser.add_argument('-v', '--verbose', action='count', default=0, help='Increase verbosity (-v INFO, -vv DEBUG)') - parser.add_argument('--unknown-log', default='unknown_events.log', help='File to log unknown JSON events') - parser.add_argument('-j', '--json', dest='json_log', default=None, help='File to log all JSON events') - args = parser.parse_args() - - # Set logging level - if args.verbose == 0: - logger.setLevel(logging.WARNING) - elif args.verbose == 1: - logger.setLevel(logging.INFO) - else: - logger.setLevel(logging.DEBUG) - - # Setup file logging for unknown events - unknown_handler = logging.FileHandler(args.unknown_log, mode='a') - unknown_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S') - unknown_handler.setFormatter(unknown_formatter) - unknown_json_logger.addHandler(unknown_handler) - unknown_json_logger.propagate = False - - # Initialize components - ui = UIManager(screen, args.host) - game_state = GameState() - - # Setup logging to output window - log_handler = ui.setup_logging() - logger.addHandler(log_handler) - - # Setup input queue - input_queue = ui.setup_input_queue() - - # Display startup messages - ui.print_message(f"*** QL pyCon Version {VERSION} starting ***\n") - ui.print_message(f"zmq python bindings {zmq.__version__}, libzmq version {zmq.zmq_version()}\n") - - # Initialize network connections - rcon = RconConnection(args.host, args.password, args.identity) - rcon.connect() - - stats_conn = None - stats_port = None - stats_password = None - stats_check_counter = 0 - - # Shutdown flag - shutdown = False - - # Setup JSON logging if requested - json_logger = None - if args.json_log: - json_handler = logging.FileHandler(args.json_log, mode='a') - json_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S') - json_handler.setFormatter(json_formatter) - all_json_logger.addHandler(json_handler) - all_json_logger.propagate = False - json_logger = all_json_logger - - # Create event parser - event_parser = EventParser(game_state, json_logger, unknown_json_logger) - - # Main event loop - while not shutdown: - # Poll RCON socket - event = rcon.poll(POLL_TIMEOUT) - - # Check monitor for connection events - monitor_event = rcon.check_monitor() - if monitor_event and monitor_event[0] == zmq.EVENT_CONNECTED: - ui.print_message("Connected to server\n") - rcon.send_command(b'register') - logger.info('Registration message sent') - - ui.print_message("Requesting connection info...\n") - rcon.send_command(b'zmq_stats_password') - rcon.send_command(b'net_port') - - # Handle user input - while not input_queue.empty(): - command = input_queue.get() - logger.info(f'Sending command: {repr(command.strip())}') - - # Display command with timestamp - timestamp = time.strftime('%H:%M:%S') - ui.print_message(f"^5[^7{timestamp}^5] >>> {command.strip()}^7\n") - - rcon.send_command(command) - - # Poll stats stream if connected - if stats_conn and stats_conn.connected: - stats_check_counter += 1 - - if stats_check_counter % 100 == 0: - logger.debug(f'Stats polling active (check #{stats_check_counter // 100})') - - stats_msg = stats_conn.recv_message() - if stats_msg: - logger.info(f'Stats event received ({len(stats_msg)} bytes)') - - # Parse game event - parsed = event_parser.parse_event(stats_msg) - if parsed: - # Format with timestamp before displaying - formatted_msg, attributes = format_message(parsed) - ui.print_message(formatted_msg) - ui.update_server_info(game_state) - - # Process RCON messages - if event > 0: - logger.debug('Socket has data available') - msg_count = 0 - - while True: - message = rcon.recv_message() - if message is None: - if msg_count > 0: - logger.debug(f'Read {msg_count} message(s)') - break - - msg_count += 1 - - if len(message) == 0: - logger.debug('Received empty message (keepalive)') - continue - - logger.debug(f'Received message ({len(message)} bytes): {repr(message[:100])}') - - # Try to parse as cvar response - if parse_cvar_response(message, game_state, ui): - logger.debug('Suppressed cvar response') - continue - - # Check for player connect/disconnect/rename events - parse_player_events(message, game_state, ui) - - # Check for stats connection info - port, password = handle_stats_connection(message, rcon, ui, game_state) - if port: - stats_port = port - if password: - stats_password = password - - # Connect to stats if we have both credentials - if stats_port and stats_password and stats_conn is None: - try: - ui.print_message("Connecting to stats stream...\n") - host_ip = args.host.split('//')[1].split(':')[0] - - stats_conn = StatsConnection(host_ip, stats_port, stats_password) - stats_conn.connect() - - ui.print_message("Stats stream connected - ready for game events\n") - - # Request initial server info - logger.info('Sending initial server info queries') - rcon.send_command(b'qlx_serverBrandName') - rcon.send_command(b'g_factoryTitle') - rcon.send_command(b'mapname') - rcon.send_command(b'timelimit') - rcon.send_command(b'fraglimit') - rcon.send_command(b'roundlimit') - rcon.send_command(b'capturelimit') - rcon.send_command(b'sv_maxclients') - - if args.json_log: - ui.print_message(f"*** JSON capture enabled: {args.json_log} ***\n") - - except Exception as e: - timestamp = time.strftime('%H:%M:%S') - ui.print_message(f"^1[^7{timestamp}^1] Error: Stats connection failed: {e}^7\n") - logger.error(f'Stats connection failed: {e}') - - # Try to parse as game event - parsed_event = event_parser.parse_event(message) - if parsed_event: - ui.print_message(parsed_event) - continue - - # Check if it looks like JSON but wasn't parsed - stripped = message.strip() - if stripped and stripped[0] in ('{', '['): - logger.debug('Unparsed JSON event') - continue - - # Try powerup message formatting - powerup_msg = format_powerup_message(message, game_state.player_tracker) - if powerup_msg: - ui.print_message(powerup_msg) - continue - - # Filter bot debug messages in default mode - is_bot_debug = (' entered ' in message and - any(x in message for x in [' seek ', ' battle ', ' chase', ' fight'])) - if is_bot_debug and args.verbose == 0: - logger.debug(f'Filtered bot debug: {message[:50]}') - continue - - # Check if it's a chat message - if ':' in message and not message.startswith(('print', 'broadcast', 'zmq')): - message = format_chat_message(message, game_state.player_tracker) - - # Format and display message - formatted_msg, attributes = format_message(message) - ui.print_message(formatted_msg) - def main_loop(screen): """Main application loop""" @@ -578,7 +362,7 @@ def main_loop(screen): continue # Check if it's a chat message - if ':' in message and not message.startswith(('print', 'broadcast', 'zmq')): + if ':' in message and not message.startswith(('print', 'broadcast', 'zmq')): message = format_chat_message(message, game_state.player_tracker) # Format and display message diff --git a/parser.py b/parser.py index 4f5328a..7eae41f 100644 --- a/parser.py +++ b/parser.py @@ -121,7 +121,7 @@ class EventParser: old_team_msg = old_team_messages.get(old_team, f'team {old_team}') team_prefix = get_team_prefix(name, self.game_state.player_tracker) - return f"{team_prefix}{name}{team_msg} from {old_team_msg}{warmup}\n" + return f"{team_prefix}^0{name}^9{team_msg} from {old_team_msg}{warmup}\n" def _handle_death(self, data): """Handle PLAYER_DEATH and PLAYER_KILL events""" @@ -149,7 +149,7 @@ class EventParser: # Environmental death (no killer) if 'KILLER' not in data or not data['KILLER']: mod = data.get('MOD', 'UNKNOWN') - msg_template = DEATH_MESSAGES.get(mod, "%s%s ^1DIED FROM %s^7") + msg_template = DEATH_MESSAGES.get(mod, "^0%s%s^9 ^1DIED FROM %s^7") if mod in DEATH_MESSAGES: msg = msg_template % (victim_prefix, victim_name) @@ -174,14 +174,14 @@ class EventParser: weapon = killer.get('WEAPON', 'OTHER_WEAPON') if weapon != 'OTHER_WEAPON': weapon_name = WEAPON_NAMES.get(weapon, weapon) - return f"{killer_prefix}{killer_name} ^7committed suicide with the ^7{weapon_name}{warmup}\n" + return f"{killer_prefix}^0{killer_name}^9 ^7committed suicide with the ^7{weapon_name}{warmup}\n" return None # Regular kill weapon = killer.get('WEAPON', 'UNKNOWN') weapon_name = WEAPON_KILL_NAMES.get(weapon, f'the {weapon}') - return f"{killer_prefix}{killer_name} ^7fragged^7 {victim_prefix}{victim_name} ^7with {weapon_name}{warmup}\n" + return f"{killer_prefix}^0{killer_name}^9 ^7fragged^7 {victim_prefix}^0{victim_name}^9 ^7with {weapon_name}{warmup}\n" def _handle_medal(self, data): """Handle PLAYER_MEDAL event""" @@ -190,7 +190,7 @@ class EventParser: warmup = " ^3(warmup)^7" if data.get('WARMUP', False) else "" team_prefix = get_team_prefix(name, self.game_state.player_tracker) - return f"{team_prefix}{name} ^7got a medal: ^6{medal}{warmup}\n" + return f"{team_prefix}^0{name}^9 ^7got a medal: ^6{medal}{warmup}\n" def _handle_match_started(self, data): """Handle MATCH_STARTED event""" @@ -203,8 +203,8 @@ class EventParser: players.append(name) if players: - formatted = " vs. ".join(players) - return f"Match has started - {formatted}\n" + formatted = "^9 vs. ^0".join(players) + return f"^0^3Match has started - ^0^7{formatted}\n" return None @@ -244,4 +244,4 @@ class EventParser: weapon_name = WEAPON_NAMES.get(best_weapon, best_weapon) - return f"^7{team_prefix}{name} K/D: {kills}/{deaths} | Best Weapon: {weapon_name} - Acc: {best_accuracy:.2f}% - Kills: {best_weapon_kills}\n" + return f"{team_prefix}^0^7{name}^9^7 K/D: {kills}/{deaths} | Best Weapon: {weapon_name} - Acc: {best_accuracy:.2f}% - Kills: {best_weapon_kills}\n" diff --git a/ui.py b/ui.py index 94e1c85..6749cfa 100644 --- a/ui.py +++ b/ui.py @@ -39,7 +39,7 @@ class CursesHandler(logging.Handler): def print_colored(window, message, attributes=0): """ Print message with Quake color codes (^N) - ^0 = bold, ^1 = red, ^2 = green, ^3 = yellow, ^4 = blue, ^5 = cyan, ^6 = magenta, ^7 = white/reset + ^0 = bold, ^1 = red, ^2 = green, ^3 = yellow, ^4 = blue, ^5 = cyan, ^6 = magenta, ^7 = white, ^9 = reset """ if not curses.has_colors: window.addstr(message) @@ -54,9 +54,10 @@ def print_colored(window, message, attributes=0): if parse_color: if ch == '0': bold = True + elif ch == '9': + bold = False elif ch == '7': color = 0 - bold = False elif ord('1') <= val <= ord('6'): color = val - ord('0') else: @@ -198,7 +199,7 @@ class UIManager: # Line 1: Hostname hostname = server_info.hostname - print_colored(self.info_window, f"^6═══^0 {hostname} ^7^6═══^7\n", 0) + print_colored(self.info_window, f"^3Name:^0 {hostname} ^7\n", 0) # Line 2: Game info gametype = server_info.gametype @@ -211,14 +212,14 @@ class UIManager: maxclients = server_info.maxclients print_colored(self.info_window, - f"^3Type:^7^0 {gametype} ^7^3Map:^7^0 {mapname} ^7^3Players:^7^0 {curclients}/{maxclients} " - f"^7^3Limits (T/F/R/C):^7 {timelimit}/{fraglimit}/{roundlimit}/{caplimit}\n", 0) + f"^3^9Type:^7^0 {gametype} ^9^3| Map:^7^0 {mapname} ^9^3| Players:^7^0 {curclients}/{maxclients} " + f"^3^9| Limits (T/F/R/C):^7^0 {timelimit}/{fraglimit}/{roundlimit}/{caplimit}^9\n", 0) # Line 3: Team headers and player lists teams = game_state.player_tracker.get_players_by_team() if server_info.gametype in TEAM_MODES: - print_colored(self.info_window, f"^0^1(RED) ^4(BLUE) ^3(SPEC)\n", 0) + print_colored(self.info_window, f"^0^1(RED) ^4(BLUE) ^3(SPEC)^9\n", 0) red_players = teams['RED'][:4] blue_players = teams['BLUE'][:4] @@ -239,13 +240,13 @@ class UIManager: red_pad = 24 - len(red_clean) blue_pad = 24 - len(blue_clean) - line = f"{red}{' ' * red_pad}{blue}{' ' * blue_pad}{spec}\n" + line = f"^0{red}^9{' ' * red_pad}^0{blue}^9{' ' * blue_pad}^0{spec}^9\n" print_colored(self.info_window, line, 0) else: print_colored(self.info_window, f"^0^3(FREE)\n", 0) free_players = teams['FREE'][:4] for player in free_players: - print_colored(self.info_window, f"{player}\n", 0) + print_colored(self.info_window, f"^0{player}^9\n", 0) # Fill remaining lines for i in range(4 - len(free_players)): self.info_window.addstr("\n")