1171 lines
53 KiB
Python
1171 lines
53 KiB
Python
#!/usr/bin/env python3
|
|
# Version: 0.7.0
|
|
|
|
import sys
|
|
import re
|
|
import time
|
|
import struct
|
|
import argparse
|
|
import uuid
|
|
import threading
|
|
import queue
|
|
import json
|
|
|
|
import logging
|
|
logger = logging.getLogger('logger')
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# Separate logger for all JSON events when --json flag is used
|
|
all_json_logger = logging.getLogger('all_json')
|
|
all_json_logger.setLevel(logging.DEBUG)
|
|
|
|
# Separate logger for unknown JSON events
|
|
unknown_json_logger = logging.getLogger('unknown_json')
|
|
unknown_json_logger.setLevel(logging.DEBUG)
|
|
|
|
import zmq
|
|
import curses
|
|
import curses.textpad
|
|
import signal
|
|
|
|
import unittest
|
|
|
|
import locale
|
|
locale.setlocale(locale.LC_ALL, '')
|
|
|
|
# Server info state
|
|
server_info = {
|
|
'hostname': 'Unknown',
|
|
'map': 'Unknown',
|
|
'gametype': 'Unknown',
|
|
'timelimit': '0',
|
|
'fraglimit': '0',
|
|
'roundlimit': '0',
|
|
'capturelimit': '0',
|
|
'players': [],
|
|
'red_score': 0,
|
|
'blue_score': 0,
|
|
'last_update': 0
|
|
}
|
|
|
|
# Message buffering for status output
|
|
status_buffer = []
|
|
in_status_output = False
|
|
|
|
# Pending background commands
|
|
pending_background_commands = set()
|
|
|
|
current_gametype = None
|
|
|
|
# Track recent events to avoid duplicates
|
|
recent_events = []
|
|
MAX_RECENT_EVENTS = 10
|
|
|
|
team_modes = ["Team Deathmatch", "Clan Arena", "Capture The Flag", "One Flag CTF", "Overload", "Harvester", "Freeze Tag" ]
|
|
|
|
player_teams = {}
|
|
|
|
def update_player_team(name, team):
|
|
"""Update player team tracking. Team can be int (1=RED, 2=BLUE, 3=SPECTATOR) or string"""
|
|
if current_gametype in team_modes:
|
|
# Convert numeric team to string
|
|
if isinstance(team, int):
|
|
team_map = {1: 'RED', 2: 'BLUE', 3: 'SPECTATOR', 0: 'FREE'}
|
|
team = team_map.get(team, 'FREE')
|
|
|
|
if team not in ['RED', 'BLUE', 'FREE', 'SPECTATOR']:
|
|
team = 'FREE'
|
|
|
|
# Store both the original name and the color-stripped version
|
|
player_teams[name] = team
|
|
clean_name = re.sub(r'\^\d', '', name)
|
|
if clean_name != name:
|
|
player_teams[clean_name] = team
|
|
|
|
logger.debug(f'Updated team for {name} (clean: {clean_name}): {team}')
|
|
|
|
def _readSocketEvent( msg ):
|
|
event_id = struct.unpack( '<H', msg[:2] )[0]
|
|
event_names = {
|
|
zmq.EVENT_ACCEPTED : 'EVENT_ACCEPTED',
|
|
zmq.EVENT_ACCEPT_FAILED : 'EVENT_ACCEPT_FAILED',
|
|
zmq.EVENT_BIND_FAILED : 'EVENT_BIND_FAILED',
|
|
zmq.EVENT_CLOSED : 'EVENT_CLOSED',
|
|
zmq.EVENT_CLOSE_FAILED : 'EVENT_CLOSE_FAILED',
|
|
zmq.EVENT_CONNECTED : 'EVENT_CONNECTED',
|
|
zmq.EVENT_CONNECT_DELAYED : 'EVENT_CONNECT_DELAYED',
|
|
zmq.EVENT_CONNECT_RETRIED : 'EVENT_CONNECT_RETRIED',
|
|
zmq.EVENT_DISCONNECTED : 'EVENT_DISCONNECTED',
|
|
zmq.EVENT_LISTENING : 'EVENT_LISTENING',
|
|
zmq.EVENT_MONITOR_STOPPED : 'EVENT_MONITOR_STOPPED',
|
|
}
|
|
event_name = event_names[ event_id ] if event_id in event_names else '%d' % event_id
|
|
event_value = struct.unpack( '<I', msg[2:] )[0]
|
|
return ( event_id, event_name, event_value )
|
|
|
|
def calculate_weapon_accuracies(weapon_data):
|
|
accuracies = {}
|
|
for weapon, stats in weapon_data.items():
|
|
shots_fired = int(stats.get('S', 0))
|
|
shots_hit = int(stats.get('H', 0))
|
|
accuracy = shots_hit / shots_fired if shots_fired > 0 else 0
|
|
accuracies[weapon] = accuracy
|
|
return accuracies
|
|
|
|
def _checkMonitor( monitor ):
|
|
try:
|
|
event_monitor = monitor.recv( zmq.NOBLOCK )
|
|
except zmq.Again:
|
|
return
|
|
|
|
( event_id, event_name, event_value ) = _readSocketEvent( event_monitor )
|
|
event_monitor_endpoint = monitor.recv( zmq.NOBLOCK )
|
|
logger.debug( 'monitor: %s %d endpoint %s' % ( event_name, event_value, event_monitor_endpoint ) )
|
|
return ( event_id, event_value )
|
|
|
|
class CursesHandler(logging.Handler):
|
|
def __init__(self, screen):
|
|
logging.Handler.__init__(self)
|
|
self.screen = screen
|
|
def emit(self, record):
|
|
try:
|
|
msg = self.format(record)
|
|
screen = self.screen
|
|
fs = "%s\n"
|
|
try:
|
|
PrintMessageFormatted(screen, fs % msg)
|
|
screen.refresh()
|
|
except UnicodeError:
|
|
PrintMessageFormatted(screen, fs % msg.encode("UTF-8"))
|
|
screen.refresh()
|
|
except (KeyboardInterrupt, SystemExit):
|
|
raise
|
|
except:
|
|
self.handleError(record)
|
|
|
|
def setupInputQueue(window):
|
|
def waitStdin( q ):
|
|
while ( True ):
|
|
l = curses.textpad.Textbox(window).edit()
|
|
if len( l ) > 0 :
|
|
q.put( l )
|
|
window.clear()
|
|
window.refresh()
|
|
q = queue.Queue()
|
|
t = threading.Thread( target = waitStdin, args = ( q, ) )
|
|
t.daemon = True
|
|
t.start()
|
|
return q
|
|
|
|
HOST = 'tcp://127.0.0.1:27961'
|
|
POLL_TIMEOUT = 100
|
|
|
|
def PrintMessageColored(window, message, attributes):
|
|
if not curses.has_colors:
|
|
window.addstr(message)
|
|
return
|
|
|
|
color = 0
|
|
parse_color = False
|
|
for ch in message:
|
|
val = ord( ch )
|
|
if parse_color:
|
|
if val >= ord('0') and val <= ord('7'):
|
|
color = val - ord('0')
|
|
if color == 7:
|
|
color = 0
|
|
else:
|
|
window.addch('^', curses.color_pair(color) | attributes)
|
|
window.addch(ch, curses.color_pair(color) | attributes)
|
|
parse_color = False
|
|
elif ch == '^':
|
|
parse_color = True
|
|
else:
|
|
window.addch(ch, curses.color_pair(color) | attributes)
|
|
|
|
window.refresh()
|
|
|
|
def PrintMessageFormatted(window, message, add_timestamp=True):
|
|
attributes = 0
|
|
|
|
message = message.replace("\\n", "")
|
|
message = message.replace(chr(25), "")
|
|
|
|
if message[:10] == "broadcast:":
|
|
message = message[11:]
|
|
attributes = curses.A_BOLD
|
|
|
|
if message[:7] == "print \"":
|
|
message = message[7:-2] + "\n"
|
|
|
|
# Don't add timestamp to server responses, especially status command
|
|
# But DO add timestamps to "zmq RCON command" lines (server acknowledgments)
|
|
skip_timestamp_keywords = [
|
|
'map:', 'num score', '---', 'bot'
|
|
]
|
|
|
|
# Special handling: don't skip "zmq RCON" lines - they should get timestamps
|
|
# But skip everything else from status output
|
|
is_status_keyword = any(keyword in message for keyword in skip_timestamp_keywords)
|
|
|
|
# Also don't timestamp "status" when it appears in isolation (status table context)
|
|
# But DO timestamp it when it's part of "zmq RCON command...status" (the echo line)
|
|
if 'status' in message and 'zmq RCON' not in message:
|
|
is_status_keyword = True
|
|
|
|
# Skip very short messages or messages with leading spaces (status fragments)
|
|
is_likely_fragment = (
|
|
len(message.strip()) <= 2 or # Very short messages
|
|
(message.startswith(' ') and len(message.strip()) < 50) or # Messages with leading space
|
|
message.strip().isdigit() # Pure numbers (scores, pings, ports, etc)
|
|
)
|
|
|
|
# Skip single words that are less than 20 chars (likely player names or status fragments)
|
|
is_short_word = len(message.strip()) < 20 and ' ' not in message.strip()
|
|
|
|
# Skip IP addresses (pattern: xxx.xxx.xxx.xxx or xxx.xxx.xxx.xxx:port)
|
|
is_ip_address = False
|
|
stripped = message.strip()
|
|
allowed_chars = set('0123456789.:')
|
|
if stripped.count('.') == 3 and all(c in allowed_chars for c in stripped):
|
|
is_ip_address = True
|
|
|
|
should_skip_timestamp = (
|
|
message.startswith('***') or
|
|
is_likely_fragment or
|
|
is_short_word or
|
|
is_ip_address or
|
|
is_status_keyword
|
|
)
|
|
|
|
# ADD THIS DEBUG BLOCK HERE:
|
|
if 'K/D:' in message or 'WINS' in message or 'TEAM' in message:
|
|
logger.debug(f'=== TIMESTAMP DEBUG ===')
|
|
logger.debug(f'Message: {message[:100]}')
|
|
logger.debug(f'is_status_keyword: {is_status_keyword}')
|
|
logger.debug(f'is_likely_fragment: {is_likely_fragment}')
|
|
logger.debug(f'is_short_word: {is_short_word}')
|
|
logger.debug(f'is_ip_address: {is_ip_address}')
|
|
logger.debug(f'should_skip_timestamp: {should_skip_timestamp}')
|
|
logger.debug(f'add_timestamp: {add_timestamp}')
|
|
# Add timestamp if requested and not a special message type
|
|
|
|
if add_timestamp and not should_skip_timestamp:
|
|
timestamp = time.strftime('%H:%M:%S')
|
|
message = f"^3[^7{timestamp}^3]^7 {message}"
|
|
|
|
PrintMessageColored(window, message, attributes)
|
|
|
|
def get_team_color(player_name):
|
|
"""Get the color-coded team prefix for a player"""
|
|
if current_gametype not in team_modes:
|
|
return ''
|
|
|
|
team = player_teams.get(player_name, None)
|
|
if not team:
|
|
return ''
|
|
|
|
return {
|
|
'RED': '^1(RED)^7',
|
|
'BLUE': '^4(BLUE)^7',
|
|
'FREE': '',
|
|
'SPECTATOR': '^3(SPEC)^7'
|
|
}.get(team, '')
|
|
|
|
def format_powerup_message(msg_str):
|
|
"""Format powerup pickup and kill messages with colors"""
|
|
|
|
if msg_str.startswith("broadcast:"):
|
|
msg_str = msg_str[11:].strip()
|
|
|
|
# Check for powerup pickup: "PlayerName got the PowerupName!"
|
|
pickup_match = re.match(r'^(.+?)\s+got the\s+(.+?)!', msg_str)
|
|
if pickup_match:
|
|
player_name = pickup_match.group(1).strip()
|
|
powerup_name = pickup_match.group(2).strip()
|
|
|
|
# Get team color for player
|
|
player_clean = re.sub(r'\^\d', '', player_name)
|
|
team_prefix = get_team_color(player_clean)
|
|
|
|
# Color code powerups
|
|
powerup_colors = {
|
|
'Quad Damage': '^5Quad Damage^7',
|
|
'Battle Suit': '^3Battle Suit^7',
|
|
'Regeneration': '^1Regeneration^7',
|
|
'Haste': '^3Haste^7',
|
|
'Invisibility': '^5Invisibility^7',
|
|
'Flight': '^5Flight^7',
|
|
'Medkit': '^1Medkit^7',
|
|
'MegaHealth': '^4MegaHealth^7'
|
|
}
|
|
|
|
colored_powerup = powerup_colors.get(powerup_name, f'^6{powerup_name}^7')
|
|
|
|
return f"{team_prefix}{player_name} ^7got the {colored_powerup}!\n"
|
|
|
|
# Check for powerup carrier kill: "PlayerName killed the PowerupName carrier!"
|
|
carrier_match = re.match(r'^(.+?)\s+killed the\s+(.+?)\s+carrier!', msg_str)
|
|
if carrier_match:
|
|
player_name = carrier_match.group(1).strip()
|
|
powerup_name = carrier_match.group(2).strip()
|
|
|
|
# Get team color for player
|
|
player_clean = re.sub(r'\^\d', '', player_name)
|
|
team_prefix = get_team_color(player_clean)
|
|
|
|
# Color code powerups (same as above)
|
|
powerup_colors = {
|
|
'Quad Damage': '^5Quad Damage^7',
|
|
'Battle Suit': '^3Battle Suit^7',
|
|
'Regeneration': '^1Regeneration^7',
|
|
'Haste': '^3Haste^7',
|
|
'Invisibility': '^5Invisibility^7',
|
|
'Flight': '^5Flight^7',
|
|
'Medkit': '^1Medkit^7',
|
|
'MegaHealth': '^4MegaHealth^7'
|
|
}
|
|
|
|
colored_powerup = powerup_colors.get(powerup_name, f'^6{powerup_name}^7')
|
|
|
|
return f"{team_prefix}{player_name} ^7killed the {colored_powerup} ^7carrier!\n"
|
|
|
|
# Not a powerup message
|
|
return None
|
|
|
|
def parse_server_response(msg_str, info_window):
|
|
"""Parse server responses to update server_info, returns True if message should be suppressed"""
|
|
global server_info, pending_background_commands, status_buffer, in_status_output, current_gametype
|
|
|
|
# Suppress only the command echo lines
|
|
if msg_str.startswith('zmq RCON command') and 'from' in msg_str:
|
|
if any(f': {cmd}' in msg_str for cmd in ['status', 'roundlimit', 'qlx_serverBrandName', 'g_factoryTitle', 'mapname', 'timelimit', 'fraglimit', 'roundlimit', 'capturelimit', 'sv_maxclients']):
|
|
return True
|
|
|
|
# Parse qlx_serverBrandName
|
|
if '"qlx_serverbrandname"' in msg_str and ' is:' in msg_str:
|
|
match = re.search(r' is:"(.+?)" default:', msg_str)
|
|
if match:
|
|
brandname = match.group(1).strip()
|
|
server_info['hostname'] = brandname
|
|
logger.info(f'Got hostname: {server_info["hostname"]}')
|
|
UpdateServerInfoWindow(info_window)
|
|
|
|
# Parse g_factoryTitle
|
|
if '"g_factoryTitle"' in msg_str and ' is:' in msg_str:
|
|
match = re.search(r' is:"([^"]*)"', msg_str) # Changed [^"]+ to [^"]*
|
|
if match:
|
|
gametype = match.group(1).strip()
|
|
current_gametype = re.sub(r'\^\d', '', gametype) # Strip color codes
|
|
server_info['gametype'] = current_gametype
|
|
logger.info(f'Got gametype: {current_gametype}')
|
|
UpdateServerInfoWindow(info_window)
|
|
|
|
# Parse Map
|
|
if '"mapname"' in msg_str and ' is:' in msg_str:
|
|
match = re.search(r' is:"([^"]*)"', msg_str)
|
|
if match:
|
|
mapname = match.group(1).strip()
|
|
server_info['map'] = re.sub(r'\^\d', '', mapname)
|
|
logger.info(f'Got mapname: {server_info["map"]}')
|
|
UpdateServerInfoWindow(info_window)
|
|
|
|
# Parse timelimit
|
|
if '"timelimit"' in msg_str and ' is:' in msg_str:
|
|
match = re.search(r' is:"([^"]*)"', msg_str)
|
|
if match:
|
|
server_info['timelimit'] = match.group(1).strip()
|
|
logger.info(f'Got Timelimit: {server_info["timelimit"]}')
|
|
UpdateServerInfoWindow(info_window)
|
|
|
|
# Parse fraglimit
|
|
if '"fraglimit"' in msg_str and ' is:' in msg_str:
|
|
match = re.search(r' is:"([^"]*)"', msg_str)
|
|
if match:
|
|
server_info['fraglimit'] = match.group(1).strip()
|
|
logger.info(f'Got Fraglimit: {server_info["fraglimit"]}')
|
|
UpdateServerInfoWindow(info_window)
|
|
|
|
# Parse roundlimit
|
|
if '"roundlimit"' in msg_str and ' is:' in msg_str:
|
|
match = re.search(r' is:"([^"]*)"', msg_str)
|
|
if match:
|
|
server_info['roundlimit'] = match.group(1).strip()
|
|
logger.info(f'Got Roundlimit: {server_info["roundlimit"]}')
|
|
UpdateServerInfoWindow(info_window)
|
|
|
|
# Parse caplimit
|
|
if '"capturelimit"' in msg_str and ' is:' in msg_str:
|
|
match = re.search(r' is:"([^"]*)"', msg_str)
|
|
if match:
|
|
server_info['capturelimit'] = match.group(1).strip()
|
|
logger.info(f'Got Caplimit: {server_info["capturelimit"]}')
|
|
UpdateServerInfoWindow(info_window)
|
|
|
|
# Parse Max Clients
|
|
if '"sv_maxclients"' in msg_str and ' is:' in msg_str:
|
|
match = re.search(r' is:"([^"]*)"', msg_str)
|
|
if match:
|
|
server_info['maxclients'] = match.group(1).strip()
|
|
logger.info(f'Got Max Clients: {server_info["maxclients"]}')
|
|
UpdateServerInfoWindow(info_window)
|
|
|
|
# Handle status command output buffering
|
|
if 'status' in pending_background_commands:
|
|
# Buffer ALL short messages (likely parts of status output)
|
|
if len(msg_str) < 100: # Status fields are short
|
|
status_buffer.append(msg_str)
|
|
in_status_output = True
|
|
return True # Suppress
|
|
|
|
# When we get an empty message or long message, process the buffer
|
|
if msg_str.strip() == '' or len(msg_str) > 100:
|
|
if status_buffer:
|
|
# Reconstruct the line
|
|
full_line = ' '.join(status_buffer)
|
|
logger.info(f'Reconstructed status line: {full_line[:100]}')
|
|
|
|
# Parse if it's a player line
|
|
parts = full_line.split()
|
|
if len(parts) >= 8 and parts[0].isdigit():
|
|
server_info['players'].append({
|
|
'name': parts[3],
|
|
'score': parts[1],
|
|
'ping': parts[2]
|
|
})
|
|
|
|
status_buffer = []
|
|
|
|
if msg_str.strip() == '':
|
|
pending_background_commands.discard('status')
|
|
in_status_output = False
|
|
|
|
return True
|
|
|
|
return False
|
|
|
|
def UpdateServerInfoWindow(info_window):
|
|
"""Update the server info window with current data"""
|
|
info_window.clear()
|
|
|
|
max_y, max_x = info_window.getmaxyx()
|
|
|
|
# Line 1: Hostname with decorative border
|
|
hostname_display = server_info.get('hostname', 'Unknown Server')
|
|
PrintMessageColored(info_window, f"^6═══ {hostname_display} ^6═══^7\n", 0)
|
|
|
|
# Line 2: Type / Map / Limits / Players
|
|
gametype_display = server_info.get('gametype', 'Unknown Type')
|
|
mapname_display = server_info.get('map', 'Unknown Map')
|
|
timelimit_display = server_info.get('timelimit', '0')
|
|
fraglimit_display = server_info.get('fraglimit', '0')
|
|
roundlimit_display = server_info.get('roundlimit', '0')
|
|
caplimit_display = server_info.get('capturelimit', '0')
|
|
curclients_display = server_info.get('curclients', '0')
|
|
maxclients_display = server_info.get('maxclients', '0')
|
|
PrintMessageColored(info_window, f"^3Type:^7 {gametype_display} ^3Map:^7 {mapname_display} ^3Players:^7 {curclients_display}/{maxclients_display} ^3Limits (T/F/R/C):^7 {timelimit_display}/{fraglimit_display}/{roundlimit_display}/{caplimit_display}\n", 0)
|
|
|
|
# Line 4: Teams, if any
|
|
if current_gametype in team_modes:
|
|
PrintMessageColored(info_window, f"^1(RED) ^4(BLUE)\n", 0)
|
|
else:
|
|
PrintMessageColored(info_window, f"^3(FREE)\n", 0)
|
|
|
|
# Lines 5-10: Reserved for future use (empty for now)
|
|
info_window.addstr("\n\n\n\n\n\n")
|
|
|
|
# Line 11: Separator
|
|
separator = "^6" + "═" * (max_x - 1) + "^7"
|
|
PrintMessageColored(info_window, separator, 0)
|
|
|
|
info_window.refresh()
|
|
|
|
def ParseGameEvent(message):
|
|
"""Parse JSON game events and return formatted message"""
|
|
global current_gametype, recent_events
|
|
|
|
try:
|
|
jObject = json.loads(message)
|
|
|
|
# Log ALL JSON events to file if --json flag was provided
|
|
if all_json_logger.handlers:
|
|
all_json_logger.info('JSON Event received:')
|
|
all_json_logger.info(json.dumps(jObject, indent=2))
|
|
all_json_logger.info('---')
|
|
|
|
if 'TYPE' not in jObject or 'DATA' not in jObject:
|
|
logger.debug('JSON missing TYPE or DATA fields')
|
|
return None
|
|
|
|
event_type = jObject['TYPE']
|
|
data = jObject['DATA']
|
|
|
|
# Create event signature for deduplication (for PLAYER_KILL/PLAYER_DEATH)
|
|
event_signature = None
|
|
if event_type in ('PLAYER_DEATH', 'PLAYER_KILL'):
|
|
# Create signature based on time, killer, victim (same signature for both types)
|
|
time_val = data.get('TIME', 0)
|
|
killer_name = data.get('KILLER', {}).get('NAME', '') if data.get('KILLER') else ''
|
|
victim_name = data.get('VICTIM', {}).get('NAME', '')
|
|
event_signature = f"KILL:{time_val}:{killer_name}:{victim_name}" # Same signature for both
|
|
|
|
# Check if we've seen this event recently
|
|
if event_signature in recent_events:
|
|
logger.debug(f'Duplicate event detected: {event_signature}')
|
|
return None
|
|
|
|
# Add to recent events and trim list
|
|
recent_events.append(event_signature)
|
|
if len(recent_events) > MAX_RECENT_EVENTS:
|
|
recent_events.pop(0)
|
|
|
|
warmup = data.get('WARMUP', False)
|
|
warmup_suffix = " ^3(warmup)" if warmup else ""
|
|
|
|
if event_type == 'PLAYER_SWITCHTEAM':
|
|
if 'KILLER' not in data:
|
|
logger.debug('PLAYER_SWITCHTEAM missing KILLER field')
|
|
return None
|
|
killer = data['KILLER']
|
|
name = killer.get('NAME', 'Unknown')
|
|
team = killer.get('TEAM', '')
|
|
old_team = killer.get('OLD_TEAM', '')
|
|
|
|
# Update player team tracking
|
|
update_player_team(name, team)
|
|
|
|
if team == old_team:
|
|
return None
|
|
|
|
team_msg = {
|
|
'FREE': ' ^7joined the ^2Fight^7',
|
|
'SPECTATOR': ' ^7joined the ^3Spectators^7',
|
|
'RED': ' ^7joined the ^1Red ^7team',
|
|
'BLUE': ' ^7joined the ^4Blue ^7team'
|
|
}.get(team, ' ^7joined team %s^7' % team)
|
|
|
|
old_team_msg = {
|
|
'FREE': 'the ^2Fight^7',
|
|
'SPECTATOR': 'the ^3Spectators^7',
|
|
'RED': '^7the ^1Red ^7team',
|
|
'BLUE': '^7the ^4Blue ^7team'
|
|
}.get(old_team, 'team %s' % old_team)
|
|
|
|
team_prefix = get_team_color(name)
|
|
return "%s%s%s from %s%s\n" % (team_prefix, name, team_msg, old_team_msg, warmup_suffix)
|
|
|
|
elif event_type == 'PLAYER_DEATH' or event_type == 'PLAYER_KILL':
|
|
if 'VICTIM' not in data:
|
|
logger.debug('PLAYER_DEATH/PLAYER_KILL missing VICTIM field')
|
|
return None
|
|
victim = data['VICTIM']
|
|
victim_name = victim.get('NAME', 'Unknown')
|
|
|
|
# Update victim team from event data
|
|
if 'TEAM' in victim:
|
|
update_player_team(victim_name, victim['TEAM'])
|
|
# ADD THIS: Update server_info players list
|
|
global server_info
|
|
if not any(p['name'] == victim_name for p in server_info['players']):
|
|
server_info['players'].append({
|
|
'name': victim_name,
|
|
'score': '0',
|
|
'ping': '0'
|
|
})
|
|
|
|
victim_team_prefix = get_team_color(victim_name)
|
|
|
|
if 'KILLER' not in data or not data['KILLER']:
|
|
mod = data.get('MOD', 'UNKNOWN')
|
|
death_msgs = {
|
|
'FALLING': "%s%s ^7cratered.",
|
|
'HURT': "%s%s ^7was in the wrong place.",
|
|
'LAVA': "%s%s ^7does a backflip into the lava.",
|
|
'WATER': "%s%s ^7sank like a rock.",
|
|
'SLIME': "%s%s ^7melted.",
|
|
'CRUSH': "%s%s ^7was crushed."
|
|
}
|
|
msg_template = death_msgs.get(mod, "%s%s ^1DIED FROM %s^7")
|
|
if mod in death_msgs:
|
|
msg = msg_template % (victim_team_prefix, victim_name)
|
|
else:
|
|
msg = msg_template % (victim_team_prefix, victim_name, mod)
|
|
return "%s%s\n" % (msg, warmup_suffix)
|
|
|
|
killer = data['KILLER']
|
|
killer_name = killer.get('NAME', 'Unknown')
|
|
|
|
# Update killer team from event data
|
|
if 'TEAM' in killer:
|
|
update_player_team(killer_name, killer['TEAM'])
|
|
if not any(p['name'] == killer_name for p in server_info['players']):
|
|
server_info['players'].append({
|
|
'name': killer_name,
|
|
'score': '0',
|
|
'ping': '0'
|
|
})
|
|
|
|
killer_team_prefix = get_team_color(killer_name)
|
|
|
|
if killer_name != victim_name:
|
|
weapon = killer.get('WEAPON', 'UNKNOWN')
|
|
weapon_names = {
|
|
'ROCKET': 'the Rocket Launcher',
|
|
'LIGHTNING': 'the Lightning Gun',
|
|
'RAILGUN': 'the Railgun',
|
|
'SHOTGUN': 'the Shotgun',
|
|
'GAUNTLET': 'the Gauntlet',
|
|
'GRENADE': 'the Grenade Launcher',
|
|
'PLASMA': 'the Plasma Gun',
|
|
'MACHINEGUN': 'the Machine Gun'
|
|
}
|
|
weapon_name = weapon_names.get(weapon, 'the %s' % weapon)
|
|
|
|
return "%s%s ^7fragged^7 %s%s ^7with %s%s\n" % (
|
|
killer_team_prefix, killer_name,
|
|
victim_team_prefix, victim_name,
|
|
weapon_name, warmup_suffix
|
|
)
|
|
|
|
else:
|
|
# Suicide
|
|
weapon = killer.get('WEAPON', 'OTHER_WEAPON')
|
|
if weapon != 'OTHER_WEAPON':
|
|
weapon_names = {
|
|
'ROCKET': 'Rocket Launcher',
|
|
'PLASMA': 'Plasma Gun',
|
|
'GRENADE': 'Grenade Launcher'
|
|
}
|
|
weapon_name = weapon_names.get(weapon, weapon)
|
|
return "%s%s ^7committed suicide with the ^7%s%s\n" % (
|
|
killer_team_prefix, killer_name, weapon_name, warmup_suffix
|
|
)
|
|
|
|
elif event_type == 'PLAYER_MEDAL':
|
|
name = data.get('NAME', 'Unknown')
|
|
medal = data.get('MEDAL', 'UNKNOWN')
|
|
warmup = data.get('WARMUP', False)
|
|
warmup_suffix = " ^3(warmup)^7" if warmup else ""
|
|
team_prefix = get_team_color(name)
|
|
return "%s%s ^7got a medal: ^6%s%s\n" % (team_prefix, name, medal, warmup_suffix)
|
|
|
|
elif event_type == 'MATCH_STARTED':
|
|
if current_gametype in team_modes:
|
|
#redteam =
|
|
#bluteam =
|
|
return None
|
|
elif current_gametype not in team_modes:
|
|
players = []
|
|
players_data = data.get('PLAYERS', [])
|
|
for player in players_data:
|
|
name = player.get('NAME', 'Unknown')
|
|
players.append(name)
|
|
formatted_players = " vs. ".join(players)
|
|
return f"Match has started - {formatted_players}\n"
|
|
else:
|
|
return None
|
|
|
|
elif event_type == 'MATCH_REPORT':
|
|
if current_gametype in team_modes:
|
|
redscore = int(data.get('TSCORE0', '0'))
|
|
bluscore = int(data.get('TSCORE1', '0'))
|
|
if redscore > bluscore:
|
|
return "^1RED TEAM ^7WINS by a score of %d to %d\n" % (redscore, bluscore)
|
|
elif bluscore > redscore:
|
|
return "^4BLUE TEAM ^7WINS by a score of %d to %d\n" % (bluscore, redscore)
|
|
else:
|
|
return "^7The match is a TIE with a score of %d to %d\n" % (redscore, bluscore)
|
|
else:
|
|
return None
|
|
|
|
elif event_type == 'PLAYER_STATS':
|
|
name = data.get('NAME', 'Unknown')
|
|
team_prefix = get_team_color(name)
|
|
kills = int(data.get('KILLS', '0'))
|
|
deaths = int(data.get('DEATHS', '0'))
|
|
weapon_data = data.get('WEAPONS', {})
|
|
accuracies = calculate_weapon_accuracies(weapon_data)
|
|
if accuracies:
|
|
best_weapon = max(accuracies, key=accuracies.get)
|
|
best_accuracy = accuracies[best_weapon] * 100
|
|
weapon_stats = weapon_data.get(best_weapon, {})
|
|
best_weapon_kills = int(weapon_stats.get('K', 0))
|
|
best_weapon_stats = f"{best_weapon}: {best_accuracy:.2f}% (Kills: {best_weapon_kills})"
|
|
else:
|
|
best_weapon_stats = "No weapon stats available."
|
|
weapon_names = {
|
|
'ROCKET': 'Rocket Launcher',
|
|
'LIGHTNING': 'Lightning Gun',
|
|
'RAILGUN': 'Railgun',
|
|
'SHOTGUN': 'Shotgun',
|
|
'GAUNTLET': 'Gauntlet',
|
|
'GRENADE': 'Grenade Launcher',
|
|
'PLASMA': 'Plasma Gun',
|
|
'MACHINEGUN': 'Machine Gun'
|
|
}
|
|
weapon_name = weapon_names.get(best_weapon)
|
|
return "^7%s%s K/D: %d/%d | Best Weapon: %s - Acc: %.2f%% - Kills: %d\n" % (team_prefix, name, kills, deaths, weapon_name, best_accuracy, best_weapon_kills)
|
|
|
|
# Placeholders
|
|
elif event_type == 'PLAYER_CONNECT':
|
|
return None
|
|
elif event_type == 'PLAYER_DISCONNECT':
|
|
return None
|
|
elif event_type == 'ROUND_OVER':
|
|
return None
|
|
|
|
else:
|
|
# Unknown event type - log at debug level and to file
|
|
logger.debug('Unknown event type: %s' % event_type)
|
|
unknown_json_logger.info('Unknown event type: %s' % event_type)
|
|
unknown_json_logger.info('Full JSON: %s' % json.dumps(jObject, indent=2))
|
|
return None
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.debug('JSON decode error: %s' % e)
|
|
return None
|
|
except (KeyError, TypeError) as e:
|
|
logger.debug('Error parsing game event: %s' % e)
|
|
return None
|
|
|
|
return None
|
|
|
|
def InitWindows(screen, args):
|
|
logger.handlers = []
|
|
curses.endwin()
|
|
|
|
curses.initscr()
|
|
screen.nodelay(1)
|
|
curses.start_color()
|
|
curses.use_default_colors()
|
|
curses.cbreak()
|
|
curses.setsyx(-1, -1)
|
|
screen.addstr("Quake Live PyCon: %s" % args.host)
|
|
screen.refresh()
|
|
maxy, maxx = screen.getmaxyx()
|
|
|
|
for i in range(1,7):
|
|
curses.init_pair(i, i, 0)
|
|
|
|
curses.init_pair(5, 6, 0)
|
|
curses.init_pair(6, 5, 0)
|
|
|
|
# Server info window at top
|
|
begin_x = 2; width = maxx - 4
|
|
begin_y = 2; height = 10
|
|
info_window = curses.newwin(height, width, begin_y, begin_x)
|
|
screen.refresh()
|
|
info_window.scrollok(False)
|
|
info_window.idlok(False)
|
|
info_window.leaveok(True)
|
|
info_window.refresh()
|
|
|
|
# Output window (main chat/events)
|
|
begin_x = 2; width = maxx - 4
|
|
begin_y = 12; height = maxy - 15
|
|
output_window = curses.newwin(height, width, begin_y, begin_x)
|
|
screen.refresh()
|
|
output_window.scrollok(True)
|
|
output_window.idlok(True)
|
|
output_window.leaveok(True)
|
|
output_window.refresh()
|
|
|
|
# Input window
|
|
begin_x = 4; width = maxx - 6
|
|
begin_y = maxy - 2; height = 1
|
|
input_window = curses.newwin(height, width, begin_y, begin_x)
|
|
screen.addstr(begin_y, begin_x - 2, '$ ')
|
|
screen.refresh()
|
|
input_window.idlok(True)
|
|
input_window.leaveok(False)
|
|
input_window.refresh()
|
|
|
|
# Divider window
|
|
begin_x = 2; width = maxx - 4
|
|
begin_y = maxy - 3; height = 1
|
|
divider_window = curses.newwin(height, width, begin_y, begin_x)
|
|
screen.refresh()
|
|
divider_window.hline(curses.ACS_HLINE, width)
|
|
divider_window.refresh()
|
|
|
|
mh = CursesHandler(output_window)
|
|
formatterDisplay = logging.Formatter('%(asctime)-8s|%(name)-12s|%(levelname)-6s|%(message)-s', '%H:%M:%S')
|
|
mh.setFormatter(formatterDisplay)
|
|
logger.addHandler(mh)
|
|
|
|
screen.refresh()
|
|
|
|
return input_window, output_window, info_window
|
|
|
|
def main(screen):
|
|
global current_gametype
|
|
|
|
parser = argparse.ArgumentParser( description = 'Verbose QuakeLive server statistics' )
|
|
parser.add_argument( '--host', default = HOST, help = 'ZMQ URI to connect to. Defaults to %s' % HOST )
|
|
parser.add_argument( '--password', required = False )
|
|
parser.add_argument( '--identity', default = uuid.uuid1().hex, help = 'Specify the socket identity. Random UUID used by default' )
|
|
parser.add_argument( '-v', '--verbose', action='count', default=0, help = 'Increase verbosity (use -v for INFO, -vv for DEBUG)' )
|
|
parser.add_argument( '--unknown-log', default='unknown_events.log', help = 'File to log unknown JSON events. Defaults to unknown_events.log' )
|
|
parser.add_argument('--json', '-j', dest='json_log', default=None, help='File to log all JSON events. If specified, all JSON messages from server will be captured')
|
|
args = parser.parse_args()
|
|
|
|
# Set logging level based on verbosity
|
|
# Default (0): WARNING - only startup, connections, and game data
|
|
# -v (1): INFO - all communications and acknowledgements
|
|
# -vv (2): DEBUG - detailed debug information including unparsed JSON
|
|
if args.verbose == 0:
|
|
logger.setLevel(logging.WARNING)
|
|
elif args.verbose == 1:
|
|
logger.setLevel(logging.INFO)
|
|
else:
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# Set up file handler for unknown JSON events
|
|
file_handler = logging.FileHandler(args.unknown_log, mode='a')
|
|
file_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S')
|
|
file_handler.setFormatter(file_formatter)
|
|
unknown_json_logger.addHandler(file_handler)
|
|
unknown_json_logger.propagate = False # Don't send to parent logger
|
|
|
|
input_window, output_window, info_window = InitWindows(screen, args)
|
|
|
|
UpdateServerInfoWindow(info_window)
|
|
|
|
PrintMessageFormatted(output_window, "*** QL pyCon Version 0.7.0 starting ***\n")
|
|
PrintMessageFormatted(output_window, "zmq python bindings {}, libzmq version {}\n".format(repr(zmq.__version__), zmq.zmq_version()))
|
|
|
|
stats_port = None
|
|
stats_password = None
|
|
stats_context = None
|
|
stats_socket = None
|
|
stats_connected = False
|
|
stats_check_counter = 0
|
|
|
|
q = setupInputQueue(input_window)
|
|
try:
|
|
logger.info('Initializing ZMQ context...')
|
|
ctx = zmq.Context()
|
|
logger.info('Creating DEALER socket...')
|
|
socket = ctx.socket( zmq.DEALER )
|
|
logger.info('Setting up socket monitor...')
|
|
monitor = socket.get_monitor_socket( zmq.EVENT_ALL )
|
|
if ( args.password is not None ):
|
|
logger.info( 'Setting password for access' )
|
|
socket.plain_username = b'rcon'
|
|
socket.plain_password = args.password.encode('utf-8')
|
|
socket.zap_domain = b'rcon'
|
|
logger.info( 'Setting socket identity: %s' % args.identity )
|
|
socket.setsockopt( zmq.IDENTITY, args.identity.encode('utf-8') )
|
|
socket.connect( args.host )
|
|
logger.info( 'Connection initiated, waiting for events...' )
|
|
while ( True ):
|
|
event = socket.poll( POLL_TIMEOUT )
|
|
event_monitor = _checkMonitor( monitor )
|
|
if ( event_monitor is not None and event_monitor[0] == zmq.EVENT_CONNECTED ):
|
|
PrintMessageFormatted(output_window, "Connected to server\n")
|
|
socket.send( b'register' )
|
|
logger.info( 'Registration message sent.' )
|
|
|
|
PrintMessageFormatted(output_window, "Requesting connection info...\n")
|
|
socket.send( b'zmq_stats_password' )
|
|
socket.send( b'net_port' )
|
|
|
|
while ( not q.empty() ):
|
|
l = q.get()
|
|
logger.info( 'Sending command: %s' % repr( l.strip() ) )
|
|
|
|
# Display the command being sent with timestamp (in cyan to differentiate)
|
|
timestamp = time.strftime('%H:%M:%S')
|
|
PrintMessageFormatted(output_window, f"^5[^7{timestamp}^5] >>> {l.strip()}^7\n", add_timestamp=False)
|
|
|
|
socket.send( l.encode('utf-8') )
|
|
|
|
if stats_connected and stats_socket:
|
|
stats_check_counter += 1
|
|
|
|
if stats_check_counter % 100 == 0:
|
|
logger.debug('Stats polling active (check #%d)' % (stats_check_counter // 100))
|
|
|
|
try:
|
|
stats_msg = stats_socket.recv(zmq.NOBLOCK)
|
|
except zmq.error.Again:
|
|
stats_msg = None
|
|
if stats_msg:
|
|
logger.info('Stats event received (%d bytes)' % len(stats_msg))
|
|
y,x = curses.getsyx()
|
|
stats_str = stats_msg.decode('utf-8', errors='replace')
|
|
logger.debug('Stats JSON: %s' % repr(stats_str[:200]))
|
|
|
|
parsed_event = ParseGameEvent(stats_str)
|
|
if parsed_event:
|
|
logger.debug('Event parsed successfully')
|
|
PrintMessageFormatted(output_window, parsed_event)
|
|
else:
|
|
logger.debug('Event parsing returned None')
|
|
|
|
curses.setsyx(y,x)
|
|
curses.doupdate()
|
|
|
|
if ( event == 0 ):
|
|
continue
|
|
|
|
logger.debug( 'Socket has data available, reading messages...' )
|
|
msg_count = 0
|
|
while ( True ):
|
|
try:
|
|
msg = socket.recv( zmq.NOBLOCK )
|
|
msg_count += 1
|
|
except zmq.error.Again:
|
|
if msg_count > 0:
|
|
logger.debug( 'Read %d message(s) from socket.' % msg_count )
|
|
break
|
|
except Exception as e:
|
|
timestamp = time.strftime('%H:%M:%S')
|
|
PrintMessageFormatted(output_window, f"^1[^7{timestamp}^1] Error: {e}^7\n", add_timestamp=False)
|
|
logger.error( 'Error receiving message: %s' % e )
|
|
break
|
|
else:
|
|
if len( msg ) > 0:
|
|
logger.debug( 'Received message (%d bytes): %s' % (len(msg), repr(msg[:100])) )
|
|
y,x = curses.getsyx()
|
|
msg_str = msg.decode('utf-8', errors='replace')
|
|
|
|
is_background_response = parse_server_response(msg_str, info_window)
|
|
if is_background_response:
|
|
logger.debug('Suppressing background command response')
|
|
continue # Skip printing this message
|
|
if 'net_port' in msg_str and ' is:' in msg_str and '"net_port"' in msg_str:
|
|
match = re.search(r' is:"([^"]+)"', msg_str)
|
|
if match:
|
|
port_str = match.group(1)
|
|
port_str = port_str.strip()
|
|
digit_match = re.search(r'(\d+)', port_str)
|
|
if digit_match and stats_port is None:
|
|
stats_port = digit_match.group(1)
|
|
logger.info('Got stats port: %s' % stats_port)
|
|
|
|
if 'zmq_stats_password' in msg_str and ' is:' in msg_str and '"zmq_stats_password"' in msg_str:
|
|
match = re.search(r' is:"([^"]+)"', msg_str)
|
|
if match and stats_password is None:
|
|
password_str = match.group(1)
|
|
password_str = re.sub(r'\^\d', '', password_str)
|
|
stats_password = password_str.strip()
|
|
logger.info('Got stats password: %s' % stats_password)
|
|
|
|
if stats_port and stats_password and not stats_connected:
|
|
try:
|
|
PrintMessageFormatted(output_window, "Connecting to stats stream...\n")
|
|
logger.info(' Host: %s:%s' % (args.host.split('//')[1].split(':')[0], stats_port))
|
|
logger.info(' Password: %s' % stats_password)
|
|
|
|
host_ip = args.host.split('//')[1].split(':')[0]
|
|
stats_host = 'tcp://%s:%s' % (host_ip, stats_port)
|
|
|
|
time.sleep(0.5)
|
|
|
|
stats_context = zmq.Context()
|
|
stats_socket = stats_context.socket(zmq.SUB)
|
|
logger.debug('Stats socket created (SUB type)')
|
|
|
|
if stats_password and stats_password.strip():
|
|
logger.debug('Setting PLAIN authentication')
|
|
stats_socket.setsockopt(zmq.PLAIN_USERNAME, b'stats')
|
|
stats_socket.setsockopt(zmq.PLAIN_PASSWORD, stats_password.encode('utf-8'))
|
|
stats_socket.setsockopt_string(zmq.ZAP_DOMAIN, 'stats')
|
|
logger.debug('Auth configured')
|
|
|
|
logger.debug('Connecting to %s' % stats_host)
|
|
stats_socket.connect(stats_host)
|
|
logger.debug('Connected')
|
|
|
|
logger.debug('Setting ZMQ_SUBSCRIBE to empty (all messages)')
|
|
stats_socket.setsockopt(zmq.SUBSCRIBE, b'')
|
|
logger.debug('Subscribed')
|
|
|
|
time.sleep(0.5)
|
|
|
|
stats_connected = True
|
|
PrintMessageFormatted(output_window, "Stats stream connected - ready for game events\n")
|
|
|
|
# Send initial server info queries (once only)
|
|
logger.info('Sending initial server info queries')
|
|
socket.send(b'qlx_serverBrandName')
|
|
socket.send(b'g_factoryTitle')
|
|
socket.send(b'mapname')
|
|
socket.send(b'timelimit')
|
|
socket.send(b'fraglimit')
|
|
socket.send(b'roundlimit')
|
|
socket.send(b'capturelimit')
|
|
socket.send(b'sv_maxclients')
|
|
|
|
# Set up file handler for all JSON events if --json flag is provided
|
|
if args.json_log:
|
|
json_file_handler = logging.FileHandler(args.json_log, mode='a')
|
|
json_file_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S')
|
|
json_file_handler.setFormatter(json_file_formatter)
|
|
all_json_logger.addHandler(json_file_handler)
|
|
all_json_logger.propagate = False
|
|
PrintMessageFormatted(output_window, f"*** JSON capture enabled: {args.json_log} ***\n")
|
|
except Exception as e:
|
|
timestamp = time.strftime('%H:%M:%S')
|
|
PrintMessageFormatted(output_window, f"^1[^7{timestamp}^1] Error: Stats connection failed: {e}^7\n", add_timestamp=False)
|
|
logger.error('Stats connection failed: %s' % e)
|
|
import traceback
|
|
logger.debug(traceback.format_exc())
|
|
|
|
# Try to parse as JSON game event
|
|
parsed_event = ParseGameEvent(msg_str)
|
|
if parsed_event:
|
|
# Successfully parsed game event
|
|
PrintMessageFormatted(output_window, parsed_event)
|
|
else:
|
|
# Check if it looks like JSON (starts with { or [)
|
|
stripped = msg_str.strip()
|
|
if stripped and stripped[0] in ('{', '['):
|
|
# It's JSON but we didn't parse it - already logged to file
|
|
logger.debug('Unparsed JSON event')
|
|
else:
|
|
# Check for powerup messages
|
|
formatted_powerup = format_powerup_message(msg_str)
|
|
if formatted_powerup:
|
|
PrintMessageFormatted(output_window, formatted_powerup)
|
|
curses.setsyx(y,x)
|
|
curses.doupdate()
|
|
continue
|
|
# Not JSON - check if it's a bot debug message (filter unless verbose)
|
|
is_bot_debug = ' entered ' in msg_str and (' seek ' in msg_str or ' battle ' in msg_str or ' chase' in msg_str or ' fight' in msg_str)
|
|
|
|
if is_bot_debug and args.verbose == 0:
|
|
# Skip bot debug messages in default mode
|
|
logger.debug('Filtered bot debug message: %s' % msg_str[:50])
|
|
continue
|
|
|
|
# Check if it's a chat message
|
|
if ':' in msg_str and not msg_str.startswith(('print', 'broadcast', 'zmq')):
|
|
# Strip the special character first to get clean player name
|
|
clean_msg = msg_str.replace(chr(25), '')
|
|
|
|
# Check if it's team chat (PlayerName): or regular chat PlayerName:
|
|
if clean_msg.strip().startswith('(') and ')' in clean_msg:
|
|
# Team chat: (PlayerName) (Location): message or (PlayerName): message
|
|
# Location can contain nested parens like (Lower Floor (Near Yellow Armour))
|
|
# We need to match the first (PlayerName), then optionally a second parenthetical with potential nesting
|
|
|
|
# First, extract player name (first parenthetical)
|
|
player_match = re.match(r'^(\([^)]+\))', clean_msg)
|
|
if player_match:
|
|
player_part = player_match.group(1)
|
|
rest_of_msg = clean_msg[len(player_part):].lstrip()
|
|
|
|
# Check if there's a location (starts with another opening paren)
|
|
if rest_of_msg.startswith('('):
|
|
# Find the matching closing paren for the location
|
|
# We need to count parens to handle nesting like (Floor (Near Armor))
|
|
paren_count = 0
|
|
location_end = -1
|
|
for i, char in enumerate(rest_of_msg):
|
|
if char == '(':
|
|
paren_count += 1
|
|
elif char == ')':
|
|
paren_count -= 1
|
|
if paren_count == 0:
|
|
location_end = i + 1
|
|
break
|
|
|
|
if location_end > 0 and location_end < len(rest_of_msg) and rest_of_msg[location_end] == ':':
|
|
# We found a valid location with closing paren followed by colon
|
|
location_part = rest_of_msg[:location_end]
|
|
message_part = rest_of_msg[location_end + 1:] # After the colon
|
|
|
|
# Extract player name for team lookup
|
|
name_match = re.match(r'\(([^)]+)\)', player_part)
|
|
if name_match:
|
|
player_name = name_match.group(1).strip()
|
|
player_name_clean = re.sub(r'\^\d', '', player_name)
|
|
|
|
team_prefix = ''
|
|
if player_name_clean in player_teams:
|
|
team_prefix = get_team_color(player_name_clean)
|
|
|
|
# Strip color codes from location for consistent yellow coloring
|
|
location_clean = re.sub(r'\^\d', '', location_part)
|
|
msg_str = f"{team_prefix}{player_part} ^3{location_clean}^7:^5{message_part}"
|
|
else:
|
|
# No valid location, treat as regular team chat
|
|
if rest_of_msg.startswith(':'):
|
|
message_part = rest_of_msg[1:]
|
|
name_match = re.match(r'\(([^)]+)\)', player_part)
|
|
if name_match:
|
|
player_name = name_match.group(1).strip()
|
|
player_name_clean = re.sub(r'\^\d', '', player_name)
|
|
|
|
team_prefix = ''
|
|
if player_name_clean in player_teams:
|
|
team_prefix = get_team_color(player_name_clean)
|
|
|
|
msg_str = f"{team_prefix}{player_part}^5:{message_part}"
|
|
else:
|
|
# No location, check if it's just (PlayerName): message
|
|
colon_match = re.match(r'^(\([^)]+\)):(\s*.*)', clean_msg)
|
|
if colon_match:
|
|
player_part = colon_match.group(1) + ':'
|
|
message_part = colon_match.group(2)
|
|
|
|
name_match = re.match(r'\(([^)]+)\)', player_part)
|
|
if name_match:
|
|
player_name = name_match.group(1).strip()
|
|
player_name_clean = re.sub(r'\^\d', '', player_name)
|
|
|
|
team_prefix = ''
|
|
if player_name_clean in player_teams:
|
|
team_prefix = get_team_color(player_name_clean)
|
|
|
|
msg_str = f"{team_prefix}{player_part}^5{message_part}\n"
|
|
else:
|
|
# Regular chat: PlayerName: message
|
|
parts = clean_msg.split(':', 1)
|
|
if len(parts) == 2:
|
|
player_name = parts[0].strip()
|
|
message_text = parts[1] # Keep the leading space
|
|
|
|
# Strip color codes from chat player name
|
|
player_name_clean = re.sub(r'\^\d', '', player_name)
|
|
|
|
# Look up player by clean name and add team prefix
|
|
team_prefix = ''
|
|
if player_name_clean in player_teams:
|
|
team_prefix = get_team_color(player_name_clean)
|
|
|
|
# Reconstruct with team prefix and colored message (^2 for regular chat)
|
|
# Need to preserve the original player_name with color codes from msg_str
|
|
original_parts = msg_str.replace(chr(25), '').split(':', 1)
|
|
if len(original_parts) == 2:
|
|
msg_str = f"{team_prefix}{original_parts[0]}:^2{original_parts[1]}"
|
|
|
|
# Print the message (chat or other)
|
|
PrintMessageFormatted(output_window, msg_str)
|
|
|
|
curses.setsyx(y,x)
|
|
curses.doupdate()
|
|
else:
|
|
logger.debug( 'Received empty message (possible keepalive or protocol frame)' )
|
|
|
|
except KeyboardInterrupt:
|
|
PrintMessageFormatted(output_window, "\nShutting down...\n")
|
|
except Exception as e:
|
|
timestamp = time.strftime('%H:%M:%S')
|
|
PrintMessageFormatted(output_window, f"^1[^7{timestamp}^1] Fatal Error: {e}^7\n", add_timestamp=False)
|
|
logger.error( 'Fatal error: %s' % e )
|
|
import traceback
|
|
logger.error( traceback.format_exc() )
|
|
finally:
|
|
if stats_socket:
|
|
stats_socket.close()
|
|
if stats_context:
|
|
stats_context.term()
|
|
|
|
if ( __name__ == '__main__' ):
|
|
curses.wrapper(main)
|
|
|
|
# Version: 0.7.0
|