This commit is contained in:
xbl
2025-12-23 09:37:42 +01:00
parent 1dcd8941b5
commit a889898694
9 changed files with 1891 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
venv3/
*.log
__pycache__/

144
README.md Normal file
View File

@ -0,0 +1,144 @@
# QLPyCon - Quake Live Python Console
A modular, refactored terminal-based client for monitoring Quake Live servers via ZMQ.
## Version 0.8.0 - Modular Architecture
### Features
- **Real-time game monitoring** - Watch kills, deaths, team switches, medals, and more
- **Server info display** - Shows hostname, map, gametype, limits, and player count
- **Team-aware chat** - Color-coded messages with team prefixes
- **Powerup tracking** - Formatted pickup and carrier kill messages
- **JSON event logging** - Capture all game events to file for analysis
- **Colorized output** - Quake color code support (^0-^7)
### Module Structure
```
qlpycon/
├── __init__.py # Package initialization
├── main.py # Entry point and main loop
├── config.py # Constants and configuration
├── state.py # Game state management (ServerInfo, PlayerTracker, etc)
├── network.py # ZMQ connections (RCON and stats stream)
├── parser.py # JSON event parsing
├── formatter.py # Message formatting and colorization
└── ui.py # Curses interface (windows, display, input)
```
### Installation
```bash
# Requirements
pip install pyzmq
# Run directly
python -m qlpycon.main --host tcp://127.0.0.1:27961 --password YOUR_PASSWORD
```
### Usage
```bash
# Basic connection
python main.py --host tcp://SERVER_IP:PORT --password RCON_PASSWORD
# Verbose mode (show all communications)
python main.py --host tcp://SERVER_IP:PORT --password PASS -v
# Debug mode (detailed logging)
python main.py --host tcp://SERVER_IP:PORT --password PASS -vv
# Capture all JSON events to file
python main.py --host tcp://SERVER_IP:PORT --password PASS --json events.log
# Custom unknown events log
python main.py --unknown-log my_unknown.log
```
### Command Line Options
- `--host` - ZMQ URI (default: tcp://127.0.0.1:27961)
- `--password` - RCON password
- `--identity` - Socket identity (random UUID by default)
- `-v, --verbose` - Increase verbosity (use -v for INFO, -vv for DEBUG)
- `--json FILE` - Log all JSON events to FILE
- `--unknown-log FILE` - Log unknown JSON events (default: unknown_events.log)
### Architecture Overview
#### State Management (`state.py`)
- **ServerInfo** - Tracks server configuration and metadata
- **PlayerTracker** - Manages player teams and information
- **EventDeduplicator** - Prevents duplicate kill/death events
- **GameState** - Main container for all state
#### Network Layer (`network.py`)
- **RconConnection** - Handles DEALER socket for RCON commands
- **StatsConnection** - Handles SUB socket for game event stream
#### Event Parsing (`parser.py`)
- **EventParser** - Parses JSON game events into formatted messages
- Modular handlers for each event type (deaths, medals, team switches, etc)
#### Message Formatting (`formatter.py`)
- Color code handling (Quake's ^N system)
- Team prefix generation
- Timestamp logic
- Chat message formatting
#### UI Layer (`ui.py`)
- **UIManager** - Manages all curses windows
- Three-panel layout: server info, output, input
- Threaded input queue for non-blocking commands
- Color rendering with curses
### Key Improvements Over v0.7.0
1. **Modular Design** - Separated concerns into focused modules
2. **Clean Classes** - OOP design for state and connections
3. **Better Maintainability** - Each module has a single responsibility
4. **Easier Testing** - Components can be tested independently
5. **Clear Interfaces** - Well-defined APIs between modules
6. **Improved Comments** - Every module, class, and function documented
### Event Types Supported
- `PLAYER_SWITCHTEAM` - Team changes
- `PLAYER_DEATH` / `PLAYER_KILL` - Frag events (deduplicated)
- `PLAYER_MEDAL` - Medal awards
- `PLAYER_STATS` - End-game statistics with weapon accuracy
- `MATCH_STARTED` - Match initialization
- `MATCH_REPORT` - Final scores
- `PLAYER_CONNECT` / `PLAYER_DISCONNECT` - Connection events
- `ROUND_OVER` - Round completion
### Color Codes
Quake Live uses `^N` color codes where N is 0-7:
- `^0` - Black
- `^1` - Red
- `^2` - Green
- `^3` - Yellow
- `^4` - Blue
- `^5` - Cyan
- `^6` - Magenta
- `^7` - White (default)
### Development
To extend QLPyCon:
1. **Add new event types** - Edit `parser.py`, add handler method
2. **Change UI layout** - Edit `ui.py`, modify window creation
3. **Add new commands** - Edit `main.py`, handle in main loop
4. **Modify formatting** - Edit `formatter.py`, adjust color/timestamp logic
5. **Add configuration** - Edit `config.py`, add constants
### License
MIT License - See original qlpycon.py for details
### Credits
Refactored from qlpycon.py v0.7.0

101
config.py Normal file
View File

@ -0,0 +1,101 @@
#!/usr/bin/env python3
"""
Configuration and constants for QLPyCon
"""
VERSION = "0.8.0"
# Network defaults
DEFAULT_HOST = 'tcp://127.0.0.1:27961'
POLL_TIMEOUT = 100
# UI dimensions
INFO_WINDOW_HEIGHT = 10
INFO_WINDOW_Y = 2
OUTPUT_WINDOW_Y = 12
INPUT_WINDOW_HEIGHT = 1
# Event deduplication
MAX_RECENT_EVENTS = 10
# Team game modes
TEAM_MODES = [
"Team Deathmatch",
"Clan Arena",
"Capture The Flag",
"One Flag CTF",
"Overload",
"Harvester",
"Freeze Tag"
]
# Team mappings
TEAM_MAP = {
0: 'FREE',
1: 'RED',
2: 'BLUE',
3: 'SPECTATOR'
}
TEAM_COLORS = {
'RED': '^1(RED)^7',
'BLUE': '^4(BLUE)^7',
'FREE': '',
'SPECTATOR': '^3(SPEC)^7'
}
# Weapon names
WEAPON_NAMES = {
'ROCKET': 'Rocket Launcher',
'LIGHTNING': 'Lightning Gun',
'RAILGUN': 'Railgun',
'SHOTGUN': 'Shotgun',
'GAUNTLET': 'Gauntlet',
'GRENADE': 'Grenade Launcher',
'PLASMA': 'Plasma Gun',
'MACHINEGUN': 'Machine Gun'
}
# Weapon names for kill messages
WEAPON_KILL_NAMES = {
'ROCKET': 'the Rocket Launcher',
'LIGHTNING': 'the Lightning Gun',
'RAILGUN': 'the Railgun',
'SHOTGUN': 'the Shotgun',
'GAUNTLET': 'the Gauntlet',
'GRENADE': 'the Grenade Launcher',
'PLASMA': 'the Plasma Gun',
'MACHINEGUN': 'the Machine Gun'
}
# Death messages
DEATH_MESSAGES = {
'FALLING': "%s%s ^7cratered.",
'HURT': "%s%s ^7was in the wrong place.",
'LAVA': "%s%s ^7does a backflip into the lava.",
'WATER': "%s%s ^7sank like a rock.",
'SLIME': "%s%s ^7melted.",
'CRUSH': "%s%s ^7was crushed."
}
# Powerup names and colors
POWERUP_COLORS = {
'Quad Damage': '^5Quad Damage^7',
'Battle Suit': '^3Battle Suit^7',
'Regeneration': '^1Regeneration^7',
'Haste': '^3Haste^7',
'Invisibility': '^5Invisibility^7',
'Flight': '^5Flight^7',
'Medkit': '^1Medkit^7',
'MegaHealth': '^4MegaHealth^7'
}
# Curses color pairs
COLOR_PAIRS = {
1: 1, # Red
2: 2, # Green
3: 3, # Yellow
4: 4, # Blue
5: 6, # Cyan (swapped)
6: 5 # Magenta (swapped)
}

202
formatter.py Normal file
View File

@ -0,0 +1,202 @@
#!/usr/bin/env python3
"""
Message formatting and colorization for QLPyCon
Handles Quake color codes and team prefixes
"""
import re
import time
from config import TEAM_COLORS
def strip_color_codes(text):
"""Remove Quake color codes (^N) from text"""
return re.sub(r'\^\d', '', text)
def get_team_prefix(player_name, player_tracker):
"""Get color-coded team prefix for a player"""
if not player_tracker.server_info.is_team_mode():
return ''
team = player_tracker.get_team(player_name)
if not team:
return ''
return TEAM_COLORS.get(team, '')
def should_add_timestamp(message):
"""Determine if a message should get a timestamp"""
# Skip status command output
skip_keywords = ['map:', 'num score', '---', 'bot', 'status']
if any(kw in message for kw in skip_keywords):
# But allow "zmq RCON" lines (command echoes)
if 'zmq RCON' not in message:
return False
# Skip very short messages or fragments
stripped = message.strip()
if len(stripped) <= 2:
return False
# Skip messages with leading spaces (status fragments)
if message.startswith(' ') and len(stripped) < 50:
return False
# Skip pure numbers
if stripped.isdigit():
return False
# Skip short single words
if len(stripped) < 20 and ' ' not in stripped:
return False
# Skip IP addresses (xxx.xxx.xxx.xxx or xxx.xxx.xxx.xxx:port)
allowed_chars = set('0123456789.:')
if stripped.count('.') == 3 and all(c in allowed_chars for c in stripped):
return False
# Skip messages starting with ***
if message.startswith('***'):
return False
return True
def format_message(message, add_timestamp=True):
"""
Format a message for display
- Strips special characters
- Adds timestamp if appropriate
- Handles broadcast formatting
"""
# Clean up message
message = message.replace("\\n", "")
message = message.replace(chr(25), "")
# Handle broadcast messages
attributes = 0
if message[:10] == "broadcast:":
message = message[11:]
attributes = 1 # Bold
# Handle print messages
if message[:7] == "print \"":
message = message[7:-2] + "\n"
# Add timestamp if requested and appropriate
if add_timestamp and should_add_timestamp(message):
timestamp = time.strftime('%H:%M:%S')
message = f"^3[^7{timestamp}^3]^7 {message}"
return message, attributes
def format_chat_message(message, player_tracker):
"""
Format chat messages with team prefixes and colors
Handles both regular chat (Name: msg) and team chat ((Name): msg or (Name) (Location): msg)
"""
# Strip special character
clean_msg = message.replace(chr(25), '')
# Team chat with location: (PlayerName) (Location): message
# Location can have nested parens like (Lower Floor (Near Yellow Armour))
if clean_msg.strip().startswith('(') and ')' in clean_msg:
# Extract player name (first parenthetical)
player_match = re.match(r'^(\([^)]+\))', clean_msg)
if not player_match:
return message
player_part = player_match.group(1)
rest = clean_msg[len(player_part):].lstrip()
# Check for location (another parenthetical)
if rest.startswith('('):
# Count parens to handle nesting
paren_count = 0
location_end = -1
for i, char in enumerate(rest):
if char == '(':
paren_count += 1
elif char == ')':
paren_count -= 1
if paren_count == 0:
location_end = i + 1
break
# Check if location ends with colon
if location_end > 0 and location_end < len(rest) and rest[location_end] == ':':
location_part = rest[:location_end]
message_part = rest[location_end + 1:]
# Get team prefix
name_match = re.match(r'\(([^)]+)\)', player_part)
if name_match:
player_name = strip_color_codes(name_match.group(1).strip())
team_prefix = get_team_prefix(player_name, player_tracker)
location_clean = strip_color_codes(location_part)
return f"{team_prefix}{player_part} ^3{location_clean}^7:^5{message_part}"
# Team chat without location: (PlayerName): message
colon_match = re.match(r'^(\([^)]+\)):(\s*.*)', clean_msg)
if colon_match:
player_part = colon_match.group(1) + ':'
message_part = colon_match.group(2)
name_match = re.match(r'\(([^)]+)\)', player_part)
if name_match:
player_name = strip_color_codes(name_match.group(1).strip())
team_prefix = get_team_prefix(player_name, player_tracker)
return f"{team_prefix}{player_part}^5{message_part}\n"
# Regular chat: PlayerName: message
parts = clean_msg.split(':', 1)
if len(parts) == 2:
player_name = strip_color_codes(parts[0].strip())
team_prefix = get_team_prefix(player_name, player_tracker)
# Preserve original color-coded name
original_parts = message.replace(chr(25), '').split(':', 1)
if len(original_parts) == 2:
return f"{team_prefix}{original_parts[0]}:^2{original_parts[1]}"
return message
def format_powerup_message(message, player_tracker):
"""
Format powerup pickup and carrier kill messages
Returns formatted message or None if not a powerup message
"""
from config import POWERUP_COLORS
if message.startswith("broadcast:"):
message = message[11:].strip()
# Powerup pickup: "PlayerName got the PowerupName!"
pickup_match = re.match(r'^(.+?)\s+got the\s+(.+?)!', message)
if pickup_match:
player_name = pickup_match.group(1).strip()
powerup_name = pickup_match.group(2).strip()
player_clean = strip_color_codes(player_name)
team_prefix = get_team_prefix(player_clean, player_tracker)
colored_powerup = POWERUP_COLORS.get(powerup_name, f'^6{powerup_name}^7')
return f"{team_prefix}{player_name} ^7got the {colored_powerup}!\n"
# Powerup carrier kill: "PlayerName killed the PowerupName carrier!"
carrier_match = re.match(r'^(.+?)\s+killed the\s+(.+?)\s+carrier!', message)
if carrier_match:
player_name = carrier_match.group(1).strip()
powerup_name = carrier_match.group(2).strip()
player_clean = strip_color_codes(player_name)
team_prefix = get_team_prefix(player_clean, player_tracker)
colored_powerup = POWERUP_COLORS.get(powerup_name, f'^6{powerup_name}^7')
return f"{team_prefix}{player_name} ^7killed the {colored_powerup} ^7carrier!\n"
return None

589
main.py Normal file
View File

@ -0,0 +1,589 @@
#!/usr/bin/env python3
"""
QLPyCon - Quake Live Python Console
Main entry point
"""
import argparse
import uuid
import logging
import time
import re
import curses
import zmq
import signal
import sys
from config import VERSION, DEFAULT_HOST, POLL_TIMEOUT
from state import GameState
from network import RconConnection, StatsConnection
from parser import EventParser
from formatter import format_message, format_chat_message, format_powerup_message
from ui import UIManager
# Configure logging
logger = logging.getLogger('main')
logger.setLevel(logging.DEBUG)
all_json_logger = logging.getLogger('all_json')
all_json_logger.setLevel(logging.DEBUG)
unknown_json_logger = logging.getLogger('unknown_json')
unknown_json_logger.setLevel(logging.DEBUG)
def signal_handler(sig, frame):
"""Handle Ctrl+C for immediate shutdown"""
# Don't call curses.endwin() here - let curses.wrapper handle it
sys.exit(0)
def parse_cvar_response(message, game_state, ui):
"""
Parse server cvar responses and update state
Returns True if message should be suppressed from display
"""
# Suppress command echo lines
if message.startswith('zmq RCON command') and 'from' in message:
suppress_cmds = ['status', 'roundlimit', 'qlx_serverBrandName', 'g_factoryTitle',
'mapname', 'timelimit', 'fraglimit', 'capturelimit', 'sv_maxclients']
if any(f': {cmd}' in message for cmd in suppress_cmds):
return True
# Parse cvar responses (format: "cvar_name" is:"value" default:...)
cvar_match = re.search(r'"([^"]+)"\s+is:"([^"]*)"', message)
if cvar_match:
cvar_name = cvar_match.group(1)
value = cvar_match.group(2)
if game_state.server_info.update_from_cvar(cvar_name, value):
ui.update_server_info(game_state)
return True
return False
def handle_stats_connection(message, rcon, ui, game_state):
"""
Handle stats connection info extraction
Returns (stats_port, stats_password) or (None, None)
"""
stats_port = None
stats_password = None
# Extract stats port
if 'net_port' in message and ' is:' in message and '"net_port"' in message:
match = re.search(r' is:"([^"]+)"', message)
if match:
port_str = match.group(1).strip()
digit_match = re.search(r'(\d+)', port_str)
if digit_match:
stats_port = digit_match.group(1)
logger.info(f'Got stats port: {stats_port}')
# Extract stats password
if 'zmq_stats_password' in message and ' is:' in message and '"zmq_stats_password"' in message:
match = re.search(r' is:"([^"]+)"', message)
if match:
password_str = match.group(1)
password_str = re.sub(r'\^\d', '', password_str) # Strip color codes
stats_password = password_str.strip()
logger.info(f'Got stats password: {stats_password}')
return stats_port, stats_password
def parse_player_events(message, game_state, ui):
"""
Parse connect, disconnect, kick, and rename messages
Returns True if message should be suppressed
"""
try:
# Strip color codes for matching
from formatter import strip_color_codes
clean_msg = strip_color_codes(message)
# Match connects: "NAME connected"
connect_match = re.match(r'^(.+?)\s+connected(?:\s+with Steam ID)?', clean_msg)
if connect_match:
player_name = message.split(' connected')[0].strip() # Keep color codes
if game_state.server_info.is_team_mode():
game_state.player_tracker.update_team(player_name, 'SPECTATOR')
game_state.player_tracker.add_player(player_name)
ui.update_server_info(game_state)
logger.debug(f'Player connected: {player_name}')
return False
# Match disconnects: "NAME disconnected", "NAME was kicked"
disconnect_patterns = [
r'^(.+?)\s+disconnected',
r'^(.+?)\s+was kicked',
]
for pattern in disconnect_patterns:
match = re.match(pattern, clean_msg)
if match:
player_name_clean = match.group(1).strip()
# Try to find the original name with color codes
original_match = re.match(pattern, message)
player_name = original_match.group(1).strip() if original_match else player_name_clean
game_state.player_tracker.remove_player(player_name)
game_state.player_tracker.remove_player(player_name_clean) # Try both
ui.update_server_info(game_state)
logger.debug(f'Player disconnected: {player_name}')
return False
# Match renames: "OldName renamed to NewName"
rename_match = re.match(r'^(.+?)\s+renamed to\s+(.+?)$', clean_msg)
if rename_match:
old_name_clean = rename_match.group(1).strip()
new_name_clean = rename_match.group(2).strip()
# Get names with color codes
original_match = re.match(r'^(.+?)\s+renamed to\s+(.+?)$', message)
old_name = original_match.group(1).strip() if original_match else old_name_clean
new_name = original_match.group(2).strip() if original_match else new_name_clean
game_state.player_tracker.rename_player(old_name, new_name)
ui.update_server_info(game_state)
logger.debug(f'Player renamed: {old_name} -> {new_name}')
return False
except Exception as e:
logger.error(f'Error in parse_player_events: {e}')
return False
def main_loop(screen):
return False
"""Main application loop"""
# Setup signal handler for Ctrl+C
signal.signal(signal.SIGINT, signal_handler)
# Parse arguments
parser = argparse.ArgumentParser(description='Verbose QuakeLive server statistics')
parser.add_argument('--host', default=DEFAULT_HOST, help=f'ZMQ URI to connect to. Defaults to {DEFAULT_HOST}')
parser.add_argument('--password', required=False, help='RCON password')
parser.add_argument('--identity', default=uuid.uuid1().hex, help='Socket identity (random UUID by default)')
parser.add_argument('-v', '--verbose', action='count', default=0, help='Increase verbosity (-v INFO, -vv DEBUG)')
parser.add_argument('--unknown-log', default='unknown_events.log', help='File to log unknown JSON events')
parser.add_argument('-j', '--json', dest='json_log', default=None, help='File to log all JSON events')
args = parser.parse_args()
# Set logging level
if args.verbose == 0:
logger.setLevel(logging.WARNING)
elif args.verbose == 1:
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.DEBUG)
# Setup file logging for unknown events
unknown_handler = logging.FileHandler(args.unknown_log, mode='a')
unknown_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S')
unknown_handler.setFormatter(unknown_formatter)
unknown_json_logger.addHandler(unknown_handler)
unknown_json_logger.propagate = False
# Initialize components
ui = UIManager(screen, args.host)
game_state = GameState()
# Setup logging to output window
log_handler = ui.setup_logging()
logger.addHandler(log_handler)
# Setup input queue
input_queue = ui.setup_input_queue()
# Display startup messages
ui.print_message(f"*** QL pyCon Version {VERSION} starting ***\n")
ui.print_message(f"zmq python bindings {zmq.__version__}, libzmq version {zmq.zmq_version()}\n")
# Initialize network connections
rcon = RconConnection(args.host, args.password, args.identity)
rcon.connect()
stats_conn = None
stats_port = None
stats_password = None
stats_check_counter = 0
# Shutdown flag
shutdown = False
# Setup JSON logging if requested
json_logger = None
if args.json_log:
json_handler = logging.FileHandler(args.json_log, mode='a')
json_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S')
json_handler.setFormatter(json_formatter)
all_json_logger.addHandler(json_handler)
all_json_logger.propagate = False
json_logger = all_json_logger
# Create event parser
event_parser = EventParser(game_state, json_logger, unknown_json_logger)
# Main event loop
while not shutdown:
# Poll RCON socket
event = rcon.poll(POLL_TIMEOUT)
# Check monitor for connection events
monitor_event = rcon.check_monitor()
if monitor_event and monitor_event[0] == zmq.EVENT_CONNECTED:
ui.print_message("Connected to server\n")
rcon.send_command(b'register')
logger.info('Registration message sent')
ui.print_message("Requesting connection info...\n")
rcon.send_command(b'zmq_stats_password')
rcon.send_command(b'net_port')
# Handle user input
while not input_queue.empty():
command = input_queue.get()
logger.info(f'Sending command: {repr(command.strip())}')
# Display command with timestamp
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^5[^7{timestamp}^5] >>> {command.strip()}^7\n")
rcon.send_command(command)
# Poll stats stream if connected
if stats_conn and stats_conn.connected:
stats_check_counter += 1
if stats_check_counter % 100 == 0:
logger.debug(f'Stats polling active (check #{stats_check_counter // 100})')
stats_msg = stats_conn.recv_message()
if stats_msg:
logger.info(f'Stats event received ({len(stats_msg)} bytes)')
# Parse game event
parsed = event_parser.parse_event(stats_msg)
if parsed:
# Format with timestamp before displaying
formatted_msg, attributes = format_message(parsed)
ui.print_message(formatted_msg)
ui.update_server_info(game_state)
# Process RCON messages
if event > 0:
logger.debug('Socket has data available')
msg_count = 0
while True:
message = rcon.recv_message()
if message is None:
if msg_count > 0:
logger.debug(f'Read {msg_count} message(s)')
break
msg_count += 1
if len(message) == 0:
logger.debug('Received empty message (keepalive)')
continue
logger.debug(f'Received message ({len(message)} bytes): {repr(message[:100])}')
# Try to parse as cvar response
if parse_cvar_response(message, game_state, ui):
logger.debug('Suppressed cvar response')
continue
# Check for player connect/disconnect/rename events
parse_player_events(message, game_state, ui)
# Check for stats connection info
port, password = handle_stats_connection(message, rcon, ui, game_state)
if port:
stats_port = port
if password:
stats_password = password
# Connect to stats if we have both credentials
if stats_port and stats_password and stats_conn is None:
try:
ui.print_message("Connecting to stats stream...\n")
host_ip = args.host.split('//')[1].split(':')[0]
stats_conn = StatsConnection(host_ip, stats_port, stats_password)
stats_conn.connect()
ui.print_message("Stats stream connected - ready for game events\n")
# Request initial server info
logger.info('Sending initial server info queries')
rcon.send_command(b'qlx_serverBrandName')
rcon.send_command(b'g_factoryTitle')
rcon.send_command(b'mapname')
rcon.send_command(b'timelimit')
rcon.send_command(b'fraglimit')
rcon.send_command(b'roundlimit')
rcon.send_command(b'capturelimit')
rcon.send_command(b'sv_maxclients')
if args.json_log:
ui.print_message(f"*** JSON capture enabled: {args.json_log} ***\n")
except Exception as e:
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^1[^7{timestamp}^1] Error: Stats connection failed: {e}^7\n")
logger.error(f'Stats connection failed: {e}')
# Try to parse as game event
parsed_event = event_parser.parse_event(message)
if parsed_event:
ui.print_message(parsed_event)
continue
# Check if it looks like JSON but wasn't parsed
stripped = message.strip()
if stripped and stripped[0] in ('{', '['):
logger.debug('Unparsed JSON event')
continue
# Try powerup message formatting
powerup_msg = format_powerup_message(message, game_state.player_tracker)
if powerup_msg:
ui.print_message(powerup_msg)
continue
# Filter bot debug messages in default mode
is_bot_debug = (' entered ' in message and
any(x in message for x in [' seek ', ' battle ', ' chase', ' fight']))
if is_bot_debug and args.verbose == 0:
logger.debug(f'Filtered bot debug: {message[:50]}')
continue
# Check if it's a chat message
if ':' in message and not message.startswith(('print', 'broadcast', 'zmq')):
message = format_chat_message(message, game_state.player_tracker)
# Format and display message
formatted_msg, attributes = format_message(message)
ui.print_message(formatted_msg)
def main_loop(screen):
"""Main application loop"""
# Setup signal handler for Ctrl+C
signal.signal(signal.SIGINT, signal_handler)
# Parse arguments
parser = argparse.ArgumentParser(description='Verbose QuakeLive server statistics')
parser.add_argument('--host', default=DEFAULT_HOST, help=f'ZMQ URI to connect to. Defaults to {DEFAULT_HOST}')
parser.add_argument('--password', required=False, help='RCON password')
parser.add_argument('--identity', default=uuid.uuid1().hex, help='Socket identity (random UUID by default)')
parser.add_argument('-v', '--verbose', action='count', default=0, help='Increase verbosity (-v INFO, -vv DEBUG)')
parser.add_argument('--unknown-log', default='unknown_events.log', help='File to log unknown JSON events')
parser.add_argument('-j', '--json', dest='json_log', default=None, help='File to log all JSON events')
args = parser.parse_args()
# Set logging level
if args.verbose == 0:
logger.setLevel(logging.WARNING)
elif args.verbose == 1:
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.DEBUG)
# Setup file logging for unknown events
unknown_handler = logging.FileHandler(args.unknown_log, mode='a')
unknown_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S')
unknown_handler.setFormatter(unknown_formatter)
unknown_json_logger.addHandler(unknown_handler)
unknown_json_logger.propagate = False
# Initialize components
ui = UIManager(screen, args.host)
game_state = GameState()
# Setup logging to output window
log_handler = ui.setup_logging()
logger.addHandler(log_handler)
# Setup input queue
input_queue = ui.setup_input_queue()
# Display startup messages
ui.print_message(f"*** QL pyCon Version {VERSION} starting ***\n")
ui.print_message(f"zmq python bindings {zmq.__version__}, libzmq version {zmq.zmq_version()}\n")
# Initialize network connections
rcon = RconConnection(args.host, args.password, args.identity)
rcon.connect()
stats_conn = None
stats_port = None
stats_password = None
stats_check_counter = 0
# Shutdown flag
shutdown = False
# Setup JSON logging if requested
json_logger = None
if args.json_log:
json_handler = logging.FileHandler(args.json_log, mode='a')
json_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S')
json_handler.setFormatter(json_formatter)
all_json_logger.addHandler(json_handler)
all_json_logger.propagate = False
json_logger = all_json_logger
# Create event parser
event_parser = EventParser(game_state, json_logger, unknown_json_logger)
# Main event loop
while not shutdown:
# Poll RCON socket
event = rcon.poll(POLL_TIMEOUT)
# Check monitor for connection events
monitor_event = rcon.check_monitor()
if monitor_event and monitor_event[0] == zmq.EVENT_CONNECTED:
ui.print_message("Connected to server\n")
rcon.send_command(b'register')
logger.info('Registration message sent')
ui.print_message("Requesting connection info...\n")
rcon.send_command(b'zmq_stats_password')
rcon.send_command(b'net_port')
# Handle user input
while not input_queue.empty():
command = input_queue.get()
logger.info(f'Sending command: {repr(command.strip())}')
# Display command with timestamp
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^5[^7{timestamp}^5] >>> {command.strip()}^7\n")
rcon.send_command(command)
# Poll stats stream if connected
if stats_conn and stats_conn.connected:
stats_check_counter += 1
if stats_check_counter % 100 == 0:
logger.debug(f'Stats polling active (check #{stats_check_counter // 100})')
stats_msg = stats_conn.recv_message()
if stats_msg:
logger.info(f'Stats event received ({len(stats_msg)} bytes)')
# Parse game event
parsed = event_parser.parse_event(stats_msg)
if parsed:
# Format with timestamp before displaying
formatted_msg, attributes = format_message(parsed)
ui.print_message(formatted_msg)
ui.update_server_info(game_state)
# Process RCON messages
if event > 0:
logger.debug('Socket has data available')
msg_count = 0
while True:
message = rcon.recv_message()
if message is None:
if msg_count > 0:
logger.debug(f'Read {msg_count} message(s)')
break
msg_count += 1
if len(message) == 0:
logger.debug('Received empty message (keepalive)')
continue
logger.debug(f'Received message ({len(message)} bytes): {repr(message[:100])}')
# Try to parse as cvar response
if parse_cvar_response(message, game_state, ui):
logger.debug('Suppressed cvar response')
continue
# Check for player connect/disconnect/rename events
parse_player_events(message, game_state, ui)
# Check for stats connection info
port, password = handle_stats_connection(message, rcon, ui, game_state)
if port:
stats_port = port
if password:
stats_password = password
# Connect to stats if we have both credentials
if stats_port and stats_password and stats_conn is None:
try:
ui.print_message("Connecting to stats stream...\n")
host_ip = args.host.split('//')[1].split(':')[0]
stats_conn = StatsConnection(host_ip, stats_port, stats_password)
stats_conn.connect()
ui.print_message("Stats stream connected - ready for game events\n")
# Request initial server info
logger.info('Sending initial server info queries')
rcon.send_command(b'qlx_serverBrandName')
rcon.send_command(b'g_factoryTitle')
rcon.send_command(b'mapname')
rcon.send_command(b'timelimit')
rcon.send_command(b'fraglimit')
rcon.send_command(b'roundlimit')
rcon.send_command(b'capturelimit')
rcon.send_command(b'sv_maxclients')
if args.json_log:
ui.print_message(f"*** JSON capture enabled: {args.json_log} ***\n")
except Exception as e:
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^1[^7{timestamp}^1] Error: Stats connection failed: {e}^7\n")
logger.error(f'Stats connection failed: {e}')
# Try to parse as game event
parsed_event = event_parser.parse_event(message)
if parsed_event:
ui.print_message(parsed_event)
continue
# Check if it looks like JSON but wasn't parsed
stripped = message.strip()
if stripped and stripped[0] in ('{', '['):
logger.debug('Unparsed JSON event')
continue
# Try powerup message formatting
powerup_msg = format_powerup_message(message, game_state.player_tracker)
if powerup_msg:
ui.print_message(powerup_msg)
continue
# Filter bot debug messages in default mode
is_bot_debug = (' entered ' in message and
any(x in message for x in [' seek ', ' battle ', ' chase', ' fight']))
if is_bot_debug and args.verbose == 0:
logger.debug(f'Filtered bot debug: {message[:50]}')
continue
# Check if it's a chat message
if ':' in message and not message.startswith(('print', 'broadcast', 'zmq')):
message = format_chat_message(message, game_state.player_tracker)
# Format and display message
formatted_msg, attributes = format_message(message)
ui.print_message(formatted_msg)
if __name__ == '__main__':
curses.wrapper(main_loop)

167
network.py Normal file
View File

@ -0,0 +1,167 @@
#!/usr/bin/env python3
"""
ZMQ network layer for QLPyCon
Handles RCON and stats stream connections
"""
import zmq
import struct
import logging
import time
logger = logging.getLogger('network')
def read_socket_event(msg):
"""Parse ZMQ socket monitor event"""
event_id = struct.unpack('<H', msg[:2])[0]
event_names = {
zmq.EVENT_ACCEPTED: 'EVENT_ACCEPTED',
zmq.EVENT_ACCEPT_FAILED: 'EVENT_ACCEPT_FAILED',
zmq.EVENT_BIND_FAILED: 'EVENT_BIND_FAILED',
zmq.EVENT_CLOSED: 'EVENT_CLOSED',
zmq.EVENT_CLOSE_FAILED: 'EVENT_CLOSE_FAILED',
zmq.EVENT_CONNECTED: 'EVENT_CONNECTED',
zmq.EVENT_CONNECT_DELAYED: 'EVENT_CONNECT_DELAYED',
zmq.EVENT_CONNECT_RETRIED: 'EVENT_CONNECT_RETRIED',
zmq.EVENT_DISCONNECTED: 'EVENT_DISCONNECTED',
zmq.EVENT_LISTENING: 'EVENT_LISTENING',
zmq.EVENT_MONITOR_STOPPED: 'EVENT_MONITOR_STOPPED',
}
event_name = event_names.get(event_id, f'{event_id}')
event_value = struct.unpack('<I', msg[2:])[0]
return (event_id, event_name, event_value)
def check_monitor(monitor):
"""Check monitor socket for events"""
try:
event_monitor = monitor.recv(zmq.NOBLOCK)
except zmq.Again:
return None
event_id, event_name, event_value = read_socket_event(event_monitor)
event_endpoint = monitor.recv(zmq.NOBLOCK)
logger.debug(f'Monitor: {event_name} {event_value} endpoint {event_endpoint}')
return (event_id, event_value)
class RconConnection:
"""RCON connection to Quake Live server"""
def __init__(self, host, password, identity):
self.host = host
self.password = password
self.identity = identity
self.context = None
self.socket = None
self.monitor = None
def connect(self):
"""Initialize connection"""
logger.info('Initializing ZMQ context...')
self.context = zmq.Context()
logger.info('Creating DEALER socket...')
self.socket = self.context.socket(zmq.DEALER)
logger.info('Setting up socket monitor...')
self.monitor = self.socket.get_monitor_socket(zmq.EVENT_ALL)
if self.password:
logger.info('Setting password for access')
self.socket.plain_username = b'rcon'
self.socket.plain_password = self.password.encode('utf-8')
self.socket.zap_domain = b'rcon'
logger.info(f'Setting socket identity: {self.identity}')
self.socket.setsockopt(zmq.IDENTITY, self.identity.encode('utf-8'))
self.socket.connect(self.host)
logger.info('Connection initiated, waiting for events...')
def send_command(self, command):
"""Send RCON command"""
if isinstance(command, str):
command = command.encode('utf-8')
self.socket.send(command)
logger.info(f'Sent command: {command}')
def poll(self, timeout):
"""Poll for messages"""
return self.socket.poll(timeout)
def recv_message(self):
"""Receive a message (non-blocking)"""
try:
return self.socket.recv(zmq.NOBLOCK).decode('utf-8', errors='replace')
except zmq.error.Again:
return None
def check_monitor(self):
"""Check monitor for events"""
return check_monitor(self.monitor)
def close(self):
"""Close connection"""
if self.socket:
self.socket.setsockopt(zmq.LINGER, 0) # Don't wait for unsent messages
self.socket.close()
if self.context:
self.context.term()
class StatsConnection:
"""Stats stream connection (ZMQ SUB socket)"""
def __init__(self, host, port, password):
self.host = host
self.port = port
self.password = password
self.context = None
self.socket = None
self.connected = False
def connect(self):
"""Connect to stats stream"""
stats_host = f'tcp://{self.host}:{self.port}'
logger.info(f'Connecting to stats stream: {stats_host}')
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
logger.debug('Stats socket created (SUB type)')
if self.password and self.password.strip():
logger.debug('Setting PLAIN authentication')
self.socket.setsockopt(zmq.PLAIN_USERNAME, b'stats')
self.socket.setsockopt(zmq.PLAIN_PASSWORD, self.password.encode('utf-8'))
self.socket.setsockopt_string(zmq.ZAP_DOMAIN, 'stats')
logger.debug(f'Connecting to {stats_host}')
self.socket.connect(stats_host)
logger.debug('Setting ZMQ_SUBSCRIBE to empty (all messages)')
self.socket.setsockopt(zmq.SUBSCRIBE, b'')
time.sleep(0.5)
self.connected = True
logger.info('Stats stream connected')
def recv_message(self):
"""Receive stats message (non-blocking)"""
if not self.connected:
return None
try:
msg = self.socket.recv(zmq.NOBLOCK)
return msg.decode('utf-8', errors='replace')
except zmq.error.Again:
return None
def close(self):
"""Close connection"""
if self.socket:
self.socket.setsockopt(zmq.LINGER, 0) # Don't wait for unsent messages
self.socket.close()
if self.context:
self.context.term()

247
parser.py Normal file
View File

@ -0,0 +1,247 @@
#!/usr/bin/env python3
"""
JSON game event parsing for QLPyCon
Parses events from Quake Live stats stream
"""
import json
import logging
from config import WEAPON_NAMES, WEAPON_KILL_NAMES, DEATH_MESSAGES
from formatter import get_team_prefix, strip_color_codes
logger = logging.getLogger('parser')
def calculate_weapon_accuracies(weapon_data):
"""Calculate accuracy percentages for all weapons"""
accuracies = {}
for weapon, stats in weapon_data.items():
shots_fired = int(stats.get('S', 0))
shots_hit = int(stats.get('H', 0))
accuracy = shots_hit / shots_fired if shots_fired > 0 else 0
accuracies[weapon] = accuracy
return accuracies
class EventParser:
"""Parses JSON game events into formatted messages"""
def __init__(self, game_state, json_logger=None, unknown_logger=None):
self.game_state = game_state
self.json_logger = json_logger
self.unknown_logger = unknown_logger
def parse_event(self, message):
"""
Parse JSON event and return formatted message string
Returns None if event should not be displayed
"""
try:
event = json.loads(message)
# Log all JSON if logger is configured
if self.json_logger:
self.json_logger.info('JSON Event received:')
self.json_logger.info(json.dumps(event, indent=2))
self.json_logger.info('---')
if 'TYPE' not in event or 'DATA' not in event:
logger.debug('JSON missing TYPE or DATA')
return None
event_type = event['TYPE']
data = event['DATA']
# Route to appropriate handler
handler_map = {
'PLAYER_SWITCHTEAM': self._handle_switchteam,
'PLAYER_DEATH': self._handle_death,
'PLAYER_KILL': self._handle_death, # Same handler
'PLAYER_MEDAL': self._handle_medal,
'MATCH_STARTED': self._handle_match_started,
'MATCH_REPORT': self._handle_match_report,
'PLAYER_STATS': self._handle_player_stats,
'PLAYER_CONNECT': lambda d: None,
'PLAYER_DISCONNECT': lambda d: None,
'ROUND_OVER': lambda d: None,
}
handler = handler_map.get(event_type)
if handler:
return handler(data)
else:
# Unknown event
logger.debug(f'Unknown event type: {event_type}')
if self.unknown_logger:
self.unknown_logger.info(f'Unknown event type: {event_type}')
self.unknown_logger.info(f'Full JSON: {json.dumps(event, indent=2)}')
return None
except json.JSONDecodeError as e:
logger.debug(f'JSON decode error: {e}')
return None
except (KeyError, TypeError) as e:
logger.debug(f'Error parsing event: {e}')
return None
def _handle_switchteam(self, data):
"""Handle PLAYER_SWITCHTEAM event"""
if 'KILLER' not in data:
return None
killer = data['KILLER']
name = killer.get('NAME', 'Unknown')
team = killer.get('TEAM', '')
old_team = killer.get('OLD_TEAM', '')
# Update player team
self.game_state.player_tracker.update_team(name, team)
self.game_state.player_tracker.add_player(name)
if team == old_team:
return None
warmup = " ^3(warmup)" if data.get('WARMUP', False) else ""
team_messages = {
'FREE': ' ^7joined the ^2Fight^7',
'SPECTATOR': ' ^7joined the ^3Spectators^7',
'RED': ' ^7joined the ^1Red ^7team',
'BLUE': ' ^7joined the ^4Blue ^7team'
}
old_team_messages = {
'FREE': 'the ^2Fight^7',
'SPECTATOR': 'the ^3Spectators^7',
'RED': '^7the ^1Red ^7team',
'BLUE': '^7the ^4Blue ^7team'
}
team_msg = team_messages.get(team, f' ^7joined team {team}^7')
old_team_msg = old_team_messages.get(old_team, f'team {old_team}')
team_prefix = get_team_prefix(name, self.game_state.player_tracker)
return f"{team_prefix}{name}{team_msg} from {old_team_msg}{warmup}\n"
def _handle_death(self, data):
"""Handle PLAYER_DEATH and PLAYER_KILL events"""
if 'VICTIM' not in data:
return None
victim = data['VICTIM']
victim_name = victim.get('NAME', 'Unknown')
# Check for duplicate
time_val = data.get('TIME', 0)
killer_name = data.get('KILLER', {}).get('NAME', '') if data.get('KILLER') else ''
if self.game_state.event_deduplicator.is_duplicate('PLAYER_DEATH', time_val, killer_name, victim_name):
return None
# Update victim team
if 'TEAM' in victim:
self.game_state.player_tracker.update_team(victim_name, victim['TEAM'])
self.game_state.player_tracker.add_player(victim_name)
victim_prefix = get_team_prefix(victim_name, self.game_state.player_tracker)
warmup = " ^3(warmup)" if data.get('WARMUP', False) else ""
# Environmental death (no killer)
if 'KILLER' not in data or not data['KILLER']:
mod = data.get('MOD', 'UNKNOWN')
msg_template = DEATH_MESSAGES.get(mod, "%s%s ^1DIED FROM %s^7")
if mod in DEATH_MESSAGES:
msg = msg_template % (victim_prefix, victim_name)
else:
msg = msg_template % (victim_prefix, victim_name, mod)
return f"{msg}{warmup}\n"
# Player killed by another player
killer = data['KILLER']
killer_name = killer.get('NAME', 'Unknown')
# Update killer team
if 'TEAM' in killer:
self.game_state.player_tracker.update_team(killer_name, killer['TEAM'])
self.game_state.player_tracker.add_player(killer_name)
killer_prefix = get_team_prefix(killer_name, self.game_state.player_tracker)
# Suicide
if killer_name == victim_name:
weapon = killer.get('WEAPON', 'OTHER_WEAPON')
if weapon != 'OTHER_WEAPON':
weapon_name = WEAPON_NAMES.get(weapon, weapon)
return f"{killer_prefix}{killer_name} ^7committed suicide with the ^7{weapon_name}{warmup}\n"
return None
# Regular kill
weapon = killer.get('WEAPON', 'UNKNOWN')
weapon_name = WEAPON_KILL_NAMES.get(weapon, f'the {weapon}')
return f"{killer_prefix}{killer_name} ^7fragged^7 {victim_prefix}{victim_name} ^7with {weapon_name}{warmup}\n"
def _handle_medal(self, data):
"""Handle PLAYER_MEDAL event"""
name = data.get('NAME', 'Unknown')
medal = data.get('MEDAL', 'UNKNOWN')
warmup = " ^3(warmup)^7" if data.get('WARMUP', False) else ""
team_prefix = get_team_prefix(name, self.game_state.player_tracker)
return f"{team_prefix}{name} ^7got a medal: ^6{medal}{warmup}\n"
def _handle_match_started(self, data):
"""Handle MATCH_STARTED event"""
if self.game_state.server_info.is_team_mode():
return None
players = []
for player in data.get('PLAYERS', []):
name = player.get('NAME', 'Unknown')
players.append(name)
if players:
formatted = " vs. ".join(players)
return f"Match has started - {formatted}\n"
return None
def _handle_match_report(self, data):
"""Handle MATCH_REPORT event"""
if not self.game_state.server_info.is_team_mode():
return None
red_score = int(data.get('TSCORE0', '0'))
blue_score = int(data.get('TSCORE1', '0'))
if red_score > blue_score:
return f"^1RED TEAM ^7WINS by a score of {red_score} to {blue_score}\n"
elif blue_score > red_score:
return f"^4BLUE TEAM ^7WINS by a score of {blue_score} to {red_score}\n"
else:
return f"^7The match is a TIE with a score of {red_score} to {blue_score}\n"
def _handle_player_stats(self, data):
"""Handle PLAYER_STATS event"""
name = data.get('NAME', 'Unknown')
team_prefix = get_team_prefix(name, self.game_state.player_tracker)
kills = int(data.get('KILLS', '0'))
deaths = int(data.get('DEATHS', '0'))
weapon_data = data.get('WEAPONS', {})
accuracies = calculate_weapon_accuracies(weapon_data)
if not accuracies:
return None
best_weapon = max(accuracies, key=accuracies.get)
best_accuracy = accuracies[best_weapon] * 100
weapon_stats = weapon_data.get(best_weapon, {})
best_weapon_kills = int(weapon_stats.get('K', 0))
weapon_name = WEAPON_NAMES.get(best_weapon, best_weapon)
return f"^7{team_prefix}{name} K/D: {kills}/{deaths} | Best Weapon: {weapon_name} - Acc: {best_accuracy:.2f}% - Kills: {best_weapon_kills}\n"

180
state.py Normal file
View File

@ -0,0 +1,180 @@
#!/usr/bin/env python3
"""
Game state management for QLPyCon
Tracks server info, players, and teams
"""
import re
import logging
from config import TEAM_MODES, TEAM_MAP, MAX_RECENT_EVENTS
logger = logging.getLogger('state')
class ServerInfo:
"""Tracks current server information"""
def __init__(self):
self.hostname = 'Unknown'
self.map = 'Unknown'
self.gametype = 'Unknown'
self.timelimit = '0'
self.fraglimit = '0'
self.roundlimit = '0'
self.capturelimit = '0'
self.maxclients = '0'
self.curclients = '0'
self.red_score = 0
self.blue_score = 0
self.players = []
self.last_update = 0
def is_team_mode(self):
"""Check if current gametype is a team mode"""
return self.gametype in TEAM_MODES
def update_from_cvar(self, cvar_name, value):
"""Update server info from a cvar response"""
# Normalize cvar name to lowercase for case-insensitive matching
cvar_lower = cvar_name.lower()
mapping = {
'qlx_serverbrandname': 'hostname',
'g_factorytitle': 'gametype',
'mapname': 'map',
'timelimit': 'timelimit',
'fraglimit': 'fraglimit',
'roundlimit': 'roundlimit',
'capturelimit': 'capturelimit',
'sv_maxclients': 'maxclients'
}
attr = mapping.get(cvar_lower)
if attr:
# Only strip color codes for non-hostname fields
if attr != 'hostname':
value = re.sub(r'\^\d', '', value)
setattr(self, attr, value)
logger.info(f'Updated {attr}: {value}')
return True
return False
class PlayerTracker:
"""Tracks player teams and information"""
def __init__(self, server_info):
self.server_info = server_info
self.player_teams = {}
def update_team(self, name, team):
"""Update player team. Team can be int or string"""
if not self.server_info.is_team_mode():
return
# Convert numeric team to string
if isinstance(team, int):
team = TEAM_MAP.get(team, 'FREE')
if team not in ['RED', 'BLUE', 'FREE', 'SPECTATOR']:
team = 'FREE'
# Store both original name and color-stripped version
self.player_teams[name] = team
clean_name = re.sub(r'\^\d', '', name)
if clean_name != name:
self.player_teams[clean_name] = team
logger.debug(f'Updated team for {name} (clean: {clean_name}): {team}')
def get_team(self, name):
"""Get player's team"""
return self.player_teams.get(name)
def add_player(self, name, score='0', ping='0'):
"""Add player to server's player list if not exists"""
if not any(p['name'] == name for p in self.server_info.players):
self.server_info.players.append({
'name': name,
'score': score,
'ping': ping
})
def get_players_by_team(self):
"""Get players organized by team"""
teams = {'RED': [], 'BLUE': [], 'SPECTATOR': [], 'FREE': []}
for player in self.server_info.players:
name = player['name']
team = self.player_teams.get(name, 'FREE')
if team not in teams:
team = 'FREE'
teams[team].append(name)
return teams
def remove_player(self, name):
"""Remove player from tracking"""
clean_name = re.sub(r'\^\d', '', name)
# Remove from player list (check both name and clean name)
self.server_info.players = [
p for p in self.server_info.players
if p['name'] != name and re.sub(r'\^\d', '', p['name']) != clean_name
]
# Remove from team tracking
if name in self.player_teams:
del self.player_teams[name]
if clean_name in self.player_teams and clean_name != name:
del self.player_teams[clean_name]
logger.debug(f'Removed player: {name} (clean: {clean_name})')
def rename_player(self, old_name, new_name):
"""Rename a player while maintaining their team"""
# Get current team
team = self.player_teams.get(old_name, 'SPECTATOR')
# Remove old entries
self.remove_player(old_name)
# Add with new name and team
if self.server_info.is_team_mode():
self.update_team(new_name, team)
self.add_player(new_name)
logger.debug(f'Renamed player: {old_name} -> {new_name}')
class EventDeduplicator:
"""Prevents duplicate kill/death events"""
def __init__(self):
self.recent_events = []
def is_duplicate(self, event_type, time_val, killer_name, victim_name):
"""Check if this kill/death event is a duplicate"""
if event_type not in ('PLAYER_DEATH', 'PLAYER_KILL'):
return False
signature = f"KILL:{time_val}:{killer_name}:{victim_name}"
if signature in self.recent_events:
logger.debug(f'Duplicate event: {signature}')
return True
# Add to recent events
self.recent_events.append(signature)
if len(self.recent_events) > MAX_RECENT_EVENTS:
self.recent_events.pop(0)
return False
class GameState:
"""Main game state container"""
def __init__(self):
self.server_info = ServerInfo()
self.player_tracker = PlayerTracker(self.server_info)
self.event_deduplicator = EventDeduplicator()
self.pending_background_commands = set()
self.status_buffer = []

258
ui.py Normal file
View File

@ -0,0 +1,258 @@
#!/usr/bin/env python3
"""
Curses-based UI for QLPyCon
Handles terminal display, windows, and color rendering
"""
import curses
import curses.textpad
import threading
import queue
import logging
from config import COLOR_PAIRS, INFO_WINDOW_HEIGHT, INFO_WINDOW_Y, OUTPUT_WINDOW_Y, INPUT_WINDOW_HEIGHT, TEAM_MODES
logger = logging.getLogger('ui')
class CursesHandler(logging.Handler):
"""Logging handler that outputs to curses window"""
def __init__(self, window):
logging.Handler.__init__(self)
self.window = window
def emit(self, record):
try:
msg = self.format(record)
fs = "%s\n"
try:
print_colored(self.window, fs % msg, 0)
self.window.refresh()
except UnicodeError:
print_colored(self.window, fs % msg.encode("UTF-8"), 0)
self.window.refresh()
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def print_colored(window, message, attributes=0):
"""
Print message with Quake color codes (^N)
^0 = black, ^1 = red, ^2 = green, ^3 = yellow, ^4 = blue, ^5 = cyan, ^6 = magenta, ^7 = white
"""
if not curses.has_colors:
window.addstr(message)
return
color = 0
parse_color = False
for ch in message:
val = ord(ch)
if parse_color:
if ord('0') <= val <= ord('7'):
color = val - ord('0')
if color == 7:
color = 0
else:
window.addch('^', curses.color_pair(color) | attributes)
window.addch(ch, curses.color_pair(color) | attributes)
parse_color = False
elif ch == '^':
parse_color = True
else:
window.addch(ch, curses.color_pair(color) | attributes)
window.refresh()
class UIManager:
"""Manages curses windows and display"""
def __init__(self, screen, host):
self.screen = screen
self.host = host
self.info_window = None
self.output_window = None
self.input_window = None
self.divider_window = None
self.input_queue = None
self._init_curses()
self._create_windows()
def _init_curses(self):
"""Initialize curses settings"""
curses.endwin()
curses.initscr()
self.screen.nodelay(1)
curses.start_color()
curses.use_default_colors()
curses.cbreak()
curses.setsyx(-1, -1)
self.screen.addstr(f"Quake Live PyCon: {self.host}")
self.screen.refresh()
# Initialize color pairs
for i in range(1, 7):
curses.init_pair(i, i, 0)
# Swap cyan and magenta (5 and 6)
curses.init_pair(5, 6, 0)
curses.init_pair(6, 5, 0)
def _create_windows(self):
"""Create all UI windows"""
maxy, maxx = self.screen.getmaxyx()
# Server info window (top)
self.info_window = curses.newwin(
INFO_WINDOW_HEIGHT,
maxx - 4,
INFO_WINDOW_Y,
2
)
self.info_window.scrollok(False)
self.info_window.idlok(False)
self.info_window.leaveok(True)
self.info_window.refresh()
# Output window (middle - main display)
self.output_window = curses.newwin(
maxy - 15,
maxx - 4,
OUTPUT_WINDOW_Y,
2
)
self.output_window.scrollok(True)
self.output_window.idlok(True)
self.output_window.leaveok(True)
self.output_window.refresh()
# Input window (bottom)
self.input_window = curses.newwin(
INPUT_WINDOW_HEIGHT,
maxx - 6,
maxy - 2,
4
)
self.screen.addstr(maxy - 2, 2, '$ ')
self.input_window.idlok(True)
self.input_window.leaveok(False)
self.input_window.refresh()
# Divider line
self.divider_window = curses.newwin(
1,
maxx - 4,
maxy - 3,
2
)
self.divider_window.hline(curses.ACS_HLINE, maxx - 4)
self.divider_window.refresh()
self.screen.refresh()
def setup_input_queue(self):
"""Setup threaded input queue"""
def wait_stdin(q, window):
while True:
line = curses.textpad.Textbox(window).edit()
if len(line) > 0:
q.put(line)
window.clear()
window.refresh()
self.input_queue = queue.Queue()
t = threading.Thread(target=wait_stdin, args=(self.input_queue, self.input_window))
t.daemon = True
t.start()
return self.input_queue
def setup_logging(self):
"""Setup logging handler for output window"""
handler = CursesHandler(self.output_window)
formatter = logging.Formatter('%(asctime)-8s|%(name)-12s|%(levelname)-6s|%(message)-s', '%H:%M:%S')
handler.setFormatter(formatter)
return handler
def print_message(self, message, attributes=0):
"""Print formatted message to output window"""
y, x = curses.getsyx()
print_colored(self.output_window, message, attributes)
curses.setsyx(y, x)
curses.doupdate()
def update_server_info(self, game_state):
"""Update server info window"""
self.info_window.clear()
max_y, max_x = self.info_window.getmaxyx()
server_info = game_state.server_info
# Line 1: Hostname
hostname = server_info.hostname
print_colored(self.info_window, f"^6═══ {hostname} ^6═══^7\n", 0)
# Line 2: Game info
gametype = server_info.gametype
mapname = server_info.map
timelimit = server_info.timelimit
fraglimit = server_info.fraglimit
roundlimit = server_info.roundlimit
caplimit = server_info.capturelimit
curclients = server_info.curclients
maxclients = server_info.maxclients
print_colored(self.info_window,
f"^3Type:^7 {gametype} ^3Map:^7 {mapname} ^3Players:^7 {curclients}/{maxclients} "
f"^3Limits (T/F/R/C):^7 {timelimit}/{fraglimit}/{roundlimit}/{caplimit}\n", 0)
# Line 3: Team headers and player lists
teams = game_state.player_tracker.get_players_by_team()
if server_info.gametype in TEAM_MODES:
print_colored(self.info_window, f"^1(RED) ^4(BLUE) ^3(SPEC)\n", 0)
red_players = teams['RED'][:4]
blue_players = teams['BLUE'][:4]
spec_players = teams['SPECTATOR'][:4]
for i in range(4):
red = red_players[i] if i < len(red_players) else ''
blue = blue_players[i] if i < len(blue_players) else ''
spec = spec_players[i] if i < len(spec_players) else ''
# Strip color codes for padding calculation
from formatter import strip_color_codes
red_clean = strip_color_codes(red)
blue_clean = strip_color_codes(blue)
spec_clean = strip_color_codes(spec)
# Calculate padding (24 chars per column)
red_pad = 24 - len(red_clean)
blue_pad = 24 - len(blue_clean)
line = f"{red}{' ' * red_pad}{blue}{' ' * blue_pad}{spec}\n"
print_colored(self.info_window, line, 0)
else:
print_colored(self.info_window, f"^3(FREE)\n", 0)
free_players = teams['FREE'][:4]
for player in free_players:
print_colored(self.info_window, f"{player}\n", 0)
# Fill remaining lines
for i in range(4 - len(free_players)):
self.info_window.addstr("\n")
# Blank lines to fill
self.info_window.addstr("\n\n")
# Separator
separator = "^6" + "" * (max_x - 1) + "^7"
print_colored(self.info_window, separator, 0)
self.info_window.refresh()