This commit is contained in:
xbl
2026-01-09 13:17:30 +01:00
parent 8d8a729490
commit 7d5210214a
12 changed files with 1790 additions and 620 deletions

240
AUTOCOMPLETE.md Normal file
View File

@ -0,0 +1,240 @@
# Autocomplete Feature
## Overview
QLPyCon includes intelligent autocomplete for console variables (cvars) and commands with fuzzy matching support.
## Features
- **Real-time Suggestions**: Fuzzy matches appear below the input line as you type
- **Tab Cycling**: Press Tab to cycle through suggestions
- **Smart Argument Highlighting**: Current argument position highlighted in reverse video
- **Argument Value Suggestions**: Intelligent suggestions for command arguments (bot names, maps, gametypes, etc.)
- **Command Signatures**: Automatic display when typing commands with arguments
- **Fuzzy Matching**: Finds matches even with partial or misspelled input
- **Smart Scoring**: Best matches appear first (exact > prefix > substring > fuzzy)
## Usage
Type a partial command (2+ characters) and fuzzy matches appear below the prompt:
```
$ sv_m
sv_maxclients sv_minrate sv_maxrate sv_timeout sv_floodprotect
```
Press **Tab** to cycle through the suggestions:
```
$ sv_m [Tab] → sv_maxclients
$ sv_maxclients [Tab] → sv_minrate
$ sv_minrate [Tab] → sv_maxrate
$ sv_maxrate [Tab] → sv_timeout
$ sv_timeout [Tab] → sv_floodprotect
$ sv_floodprotect [Tab] → sv_maxclients (cycles back)
```
**Command signatures with argument highlighting:**
When you type a command that has arguments, the signature appears automatically with the **current argument highlighted**:
```
$ addbot
[<botname>] [skill 1-5] [team] [msec delay] [altname]
↑ highlighted (currently typing this)
$ addbot sarge
<botname> [[skill 1-5]] [team] [msec delay] [altname]
↑ highlighted (now typing skill level)
$ addbot sarge 5
<botname> [skill 1-5] [[team]] [msec delay] [altname]
↑ highlighted (now typing team)
$ kick
[<player>]
↑ highlighted
$ g_gametype
[<0=FFA 1=Duel 2=TDM 3=CA 4=CTF...>]
↑ highlighted
```
The highlighted argument (shown with `[[ ]]` above, displayed in reverse video) shows you **exactly what to type next**.
Matches update in real-time as you type or delete characters.
**Argument value suggestions:**
After typing a command with a space, the system suggests valid values for each argument with fuzzy matching:
```
$ addbot
Anarki Angel Biker Bitterman Bones
↑ Shows bot names
$ addbot sar
Sarge
↑ Fuzzy matches 'sar' → 'Sarge'
$ addbot Sarge
1 2 3 4 5
↑ Shows skill levels (1-5)
$ addbot Sarge 4
red blue free spectator any
↑ Shows team values
$ map
aerowalk almostlost arenagate asylum battleforged
↑ Shows map names
$ map blood
bloodrun
↑ Fuzzy matches 'blood' → 'bloodrun'
$ g_gametype
0 1 2 3 4 FFA Duel TDM CA CTF
↑ Shows numeric and string gametype values
$ callvote
map kick shuffle teamsize
↑ Shows vote types
```
The system knows valid values for **25+ commands** including:
- **32 bot names**: Sarge, Ranger, Visor, Xaero, Anarki, etc.
- **40+ maps**: bloodrun, campgrounds, toxicity, aerowalk, etc.
- **12 game types**: FFA, Duel, TDM, CA, CTF, etc. (numeric and string forms)
- **Vote types**: map, kick, shuffle, teamsize, g_gametype, etc.
- **Team values**: red, blue, free, spectator, any
- **Skill levels**: 1-5 for bots
- **Boolean values**: 0, 1, true, false, enabled, disabled
- **Common settings**: timelimits, fraglimits, sv_fps values, etc.
Arguments with freetext (like player names or custom messages) fall back to showing the signature with highlighting.
## Supported Commands
**76 cvars and commands included:**
### Commands with Signatures
The following commands show usage help when selected:
**Bot commands:**
- `addbot` - <botname> [skill 1-5] [team] [msec delay] [altname]
- `removebot` - <botname>
**Player management:**
- `kick` - <player>
- `kickban` - <player>
- `ban` - <player>
- `tempban` - <player> <seconds>
- `tell` - <player> <message>
**Game settings:**
- `g_gametype` - <0=FFA 1=Duel 2=TDM 3=CA 4=CTF...>
- `timelimit` - <minutes>
- `fraglimit` - <frags>
- `capturelimit` - <captures>
**Map & voting:**
- `map` - <mapname>
- `callvote` - <vote type> [args...]
- `say` - <message>
### Additional Cvars
**Server Configuration:**
- sv_hostname, sv_maxclients, sv_fps, sv_pure, etc.
**Network & ZMQ:**
- net_port, zmq_rcon_enable, zmq_stats_enable, etc.
**QLX (minqlx):**
- qlx_serverBrandName, qlx_owner, qlx_redditAuth
## Technical Details
### Fuzzy Matching Algorithm
Scoring system (higher = better match):
- **1000**: Exact match
- **500+**: Prefix match (prioritizes shorter results)
- **100-**: Substring match (earlier = better)
- **50**: Contains all characters in order
### Examples:
```python
autocomplete('sv_')
['sv_fps', 'sv_pure', 'sv_maxrate', 'sv_minrate', 'sv_timeout']
autocomplete('time')
['timelimit', 'sv_timeout']
autocomplete('stat')
['status', 'zmq_stats_enable', 'zmq_stats_ip', 'zmq_stats_password', 'zmq_stats_port']
```
## Customization
### Add Your Own Cvars
Edit `cvars.py` and add to the appropriate list:
```python
# Custom cvars
CUSTOM_CVARS = [
'my_custom_cvar',
'another_cvar',
]
# Add to ALL_CVARS
ALL_CVARS = (
SERVER_CVARS +
GAME_CVARS +
# ...
CUSTOM_CVARS
)
```
### Adjust Behavior
In `ui.py`, modify:
```python
# Minimum characters before showing suggestions
if len(current_word) >= 2: # Change to 1 or 3
# Maximum suggestions displayed
suggestions = autocomplete(current_word, max_results=5) # Change limit
```
## Keyboard Shortcuts
| Key | Action |
|-----|--------|
| **Tab** | Cycle through autocomplete suggestions |
| **↑** | Previous command in history |
| **↓** | Next command in history |
| **←/→** | Move cursor |
| **Backspace** | Delete (updates suggestions) |
| **Enter** | Send command |
## Notes
- Autocomplete starts after typing 2+ characters
- Suggestions appear on the line below the prompt
- Up to 5 matches shown at once (best matches first)
- Suggestions update in real-time as you type
- History navigation (↑/↓) works normally
## Testing Autocomplete
```bash
# Run cvars module directly to test matching
python3 cvars.py
# Output shows test queries and results
```

146
README.md
View File

@ -1,119 +1,81 @@
# QLPyCon - Quake Live Python Console
A modular, refactored terminal-based client for monitoring and remote controlling Quake Live servers via ZMQ.
Terminal-based client for monitoring and controlling Quake Live servers via ZMQ.
### Features
## Features
- **Real-time game monitoring** - Watch kills, deaths, team switches, medals, and more
- **Server info display** - Shows hostname, map, gametype, limits, and player count
- **Team-aware chat** - Color-coded messages with team prefixes
- **Powerup tracking** - Formatted pickup and carrier kill messages
- **JSON event logging** - Capture all game events to file for analysis
- **Colorized output** - Quake color code support (^0-^7)
- Real-time game monitoring (kills, deaths, medals, team switches)
- Server info display (map, gametype, scores, player list)
- Team-aware colorized chat with location tracking
- Powerup pickup and carrier kill notifications
- JSON event capture for analysis
- Quake color code support (^0-^7)
- **Autocomplete for cvars/commands** with fuzzy matching
- **Intelligent argument suggestions** for 25+ commands (bot names, maps, gametypes)
### Module Structure
```
qlpycon/
├── __init__.py # Package initialization
├── main.py # Entry point and main loop
├── config.py # Constants and configuration
├── state.py # Game state management (ServerInfo, PlayerTracker, etc)
├── network.py # ZMQ connections (RCON and stats stream)
├── parser.py # JSON event parsing
├── formatter.py # Message formatting and colorization
└── ui.py # Curses interface (windows, display, input)
```
### Installation
## Installation
```bash
# Requirements
pip install pyzmq
# Run directly
python -m qlpycon.main --host tcp://127.0.0.1:27961 --password YOUR_PASSWORD
python main.py --host tcp://SERVER_IP:PORT --password RCON_PASSWORD
```
### Usage
## Usage
```bash
# Basic connection
# Basic
python main.py --host tcp://SERVER_IP:PORT --password RCON_PASSWORD
# Verbose mode (show all communications)
python main.py --host tcp://SERVER_IP:PORT --password PASS -v
# Verbose logging
python main.py --host tcp://SERVER_IP:PORT --password RCON_PASSWORD -v
# Debug mode (detailed logging)
python main.py --host tcp://SERVER_IP:PORT --password PASS -vv
# Capture all JSON events to file
python main.py --host tcp://SERVER_IP:PORT --password PASS --json events.log
# Custom unknown events log
python main.py --unknown-log my_unknown.log
# Capture JSON events
python main.py --host tcp://SERVER_IP:PORT --password RCON_PASSWORD --json events.log
```
### Command Line Options
**Options:**
- `--host URI` - ZMQ RCON endpoint (default: tcp://127.0.0.1:27961)
- `--password PASS` - RCON password (required)
- `-v` / `-vv` - Verbose (INFO) or debug (DEBUG) logging
- `--json FILE` - Log all events as JSON
- `--unknown-log FILE` - Log unparsed events (default: unknown_events.log)
- `--host` - ZMQ URI (default: tcp://127.0.0.1:27961)
- `--password` - RCON password
- `--identity` - Socket identity (random UUID by default)
- `-v, --verbose` - Increase verbosity (use -v for INFO, -vv for DEBUG)
- `--json FILE` - Log all JSON events to FILE
- `--unknown-log FILE` - Log unknown JSON events (default: unknown_events.log)
**Configuration File (Optional):**
Create `~/.qlpycon.conf` or `./qlpycon.conf`:
```ini
[connection]
host = tcp://SERVER_IP:PORT
password = your_password
### Architecture Overview
[logging]
level = INFO
```
#### State Management (`state.py`)
- **ServerInfo** - Tracks server configuration and metadata
- **PlayerTracker** - Manages player teams and information
- **EventDeduplicator** - Prevents duplicate kill/death events
- **GameState** - Main container for all state
**Input Features:**
- **Smart autocomplete** - Type commands and see arguments highlighted in real-time
- **Argument value suggestions** - Intelligent suggestions for bot names, maps, gametypes, teams, etc.
- **Tab** - Cycle through autocomplete suggestions
- **↑/↓** - Command history navigation
- **Argument highlighting** - Current argument position shown in reverse video
- **Command signatures** - Automatic display (e.g., `addbot <botname> [skill 1-5] [team]`)
#### Network Layer (`network.py`)
- **RconConnection** - Handles DEALER socket for RCON commands
- **StatsConnection** - Handles SUB socket for game event stream
See [AUTOCOMPLETE.md](AUTOCOMPLETE.md) for details.
#### Event Parsing (`parser.py`)
- **EventParser** - Parses JSON game events into formatted messages
- Modular handlers for each event type (deaths, medals, team switches, etc)
## Architecture
#### Message Formatting (`formatter.py`)
- Color code handling (Quake's ^N system)
- Team prefix generation
- Timestamp logic
- Chat message formatting
```
main.py - Main loop, argument parsing, signal handling
config.py - Constants (weapons, teams, colors, limits)
state.py - Game state (ServerInfo, PlayerTracker, EventDeduplicator)
network.py - ZMQ connections (RCON DEALER, Stats SUB sockets)
parser.py - JSON event parsing (deaths, medals, switches, stats)
formatter.py - Message formatting, color codes, team prefixes
ui.py - Curses interface (3-panel: info, output, input)
```
#### UI Layer (`ui.py`)
- **UIManager** - Manages all curses windows
- Three-panel layout: server info, output, input
- Threaded input queue for non-blocking commands
- Color rendering with curses
**Supported Events:**
PLAYER_SWITCHTEAM, PLAYER_DEATH/KILL, PLAYER_MEDAL, PLAYER_STATS, MATCH_STARTED/REPORT, PLAYER_CONNECT/DISCONNECT, ROUND_OVER
### Event Types Supported
- `PLAYER_SWITCHTEAM` - Team changes
- `PLAYER_DEATH` / `PLAYER_KILL` - Frag events (deduplicated)
- `PLAYER_MEDAL` - Medal awards
- `PLAYER_STATS` - End-game statistics with weapon accuracy
- `MATCH_STARTED` - Match initialization
- `MATCH_REPORT` - Final scores
- `PLAYER_CONNECT` / `PLAYER_DISCONNECT` - Connection events
- `ROUND_OVER` - Round completion
### Color Codes
Quake Live uses `^N` color codes where N is 0-7:
- `^0` - Black
- `^1` - Red
- `^2` - Green
- `^3` - Yellow
- `^4` - Blue
- `^5` - Cyan
- `^6` - Magenta
- `^7` - White (default)
### License
## License
WTFPL

View File

@ -9,6 +9,11 @@ VERSION = "0.8.1"
DEFAULT_HOST = 'tcp://127.0.0.1:27961'
POLL_TIMEOUT = 100
# Timing constants
QUIT_CONFIRM_TIMEOUT = 3.0 # Seconds to confirm quit (Ctrl-C twice)
RESPAWN_DELAY = 3.0 # Seconds before players respawn after death
STATS_CONNECTION_DELAY = 0.5 # Initial stats connection delay
# UI dimensions
INFO_WINDOW_HEIGHT = 12
INFO_WINDOW_Y = 2
@ -18,10 +23,13 @@ INPUT_WINDOW_HEIGHT = 2
# Event deduplication
MAX_RECENT_EVENTS = 10
# UI settings
MAX_COMMAND_HISTORY = 10 # Number of commands to remember
# Team game modes
TEAM_MODES = [
"Team Deathmatch",
"Clan Arena",
"Clan Arena",
"Capture The Flag",
"One Flag CTF",
"Overload",

559
cvars.py Normal file
View File

@ -0,0 +1,559 @@
#!/usr/bin/env python3
"""
Quake Live console variables (cvars) database
Common cvars for autocomplete and fuzzy search
"""
# Server configuration
SERVER_CVARS = [
'sv_hostname',
'sv_maxclients',
'sv_privateclients',
'sv_fps',
'sv_timeout',
'sv_minrate',
'sv_maxrate',
'sv_floodprotect',
'sv_pure',
'sv_allowdownload',
]
# Game rules
GAME_CVARS = [
'g_gametype',
'g_factoryTitle',
'g_motd',
'timelimit',
'fraglimit',
'capturelimit',
'roundlimit',
'scorelimit',
'g_allowKill',
'g_allowSpecVote',
'g_friendlyFire',
'g_teamAutoJoin',
'g_teamForceBalance',
'g_warmupDelay',
'g_inactivity',
'g_quadHog',
'g_training',
'g_instagib',
]
# Map and rotation
MAP_CVARS = [
'mapname',
'nextmap',
'g_nextmap',
]
# Voting
VOTE_CVARS = [
'g_voteDelay',
'g_voteLimit',
'g_allowVote',
]
# Items and weapons
ITEM_CVARS = [
'dmflags',
'weapon_reload_rg',
'weapon_reload_sg',
'weapon_reload_gl',
'weapon_reload_rl',
'weapon_reload_lg',
'weapon_reload_pg',
'weapon_reload_hmg',
]
# Network
NETWORK_CVARS = [
'net_port',
'net_ip',
'net_strict',
]
# ZMQ
ZMQ_CVARS = [
'zmq_rcon_enable',
'zmq_rcon_ip',
'zmq_rcon_port',
'zmq_rcon_password',
'zmq_stats_enable',
'zmq_stats_ip',
'zmq_stats_port',
'zmq_stats_password',
]
# QLX (minqlx) specific
QLX_CVARS = [
'qlx_serverBrandName',
'qlx_owner',
'qlx_redditAuth',
]
# Bot cvars
BOT_CVARS = [
'bot_enable',
'bot_nochat',
'bot_minplayers',
]
# Common commands (not cvars but useful for autocomplete)
COMMANDS = [
'status',
'map',
'map_restart',
'kick',
'kickban',
'ban',
'unban',
'tempban',
'tell',
'say',
'callvote',
'vote',
'rcon',
'addbot',
'removebot',
'killserver',
'quit',
'team',
]
# Bot names for addbot command (complete list)
BOT_NAMES = [
'Anarki', 'Angel', 'Biker', 'Bitterman', 'Bones', 'Cadaver',
'Crash', 'Daemia', 'Doom', 'Gorre', 'Grunt', 'Hossman',
'Hunter', 'Keel', 'Klesk', 'Lucy', 'Major', 'Mynx', 'Orbb',
'Patriot', 'Phobos', 'Ranger', 'Razor', 'Sarge', 'Slash',
'Sorlag', 'Stripe', 'TankJr', 'Uriel', 'Visor', 'Wrack', 'Xaero'
]
# Skill levels for bots
BOT_SKILL_LEVELS = ['1', '2', '3', '4', '5']
# Team values
TEAM_VALUES = ['red', 'blue', 'free', 'spectator', 'r', 'b', 'f', 's']
# Popular competitive Quake Live maps
MAP_NAMES = [
'aerowalk', 'almostlost', 'arenagate', 'asylum', 'battleforged',
'bloodrun', 'brimstoneabbey', 'campgrounds', 'cannedheat',
'citycrossings', 'cure', 'deepinside', 'dismemberment',
'elder', 'eviscerated', 'falloutbunker', 'finnegans',
'furiousheights', 'gospelcrossings', 'grimdungeons',
'hearth', 'hektik', 'lostworld', 'monsoon', 'reflux',
'repent', 'shiningforces', 'sinister', 'spacectf',
'spidercrossings', 'stonekeep', 'terminus', 'tornado',
'toxicity', 'trinity', 'verticalvengeance', 'warehouses', 'whisper'
]
# Game type names (string values)
GAMETYPE_NAMES = [
'ffa', 'duel', 'race', 'tdm', 'ca', 'ctf', 'oneflag',
'har', 'ft', 'dom', 'ad', 'rr'
]
# Game type numbers (legacy numeric values)
GAMETYPE_NUMBERS = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11']
# Vote types for callvote command
VOTE_TYPES = [
'map', 'map_restart', 'nextmap', 'gametype', 'kick',
'timelimit', 'fraglimit', 'shuffle', 'teamsize',
'cointoss', 'random', 'loadouts', 'ammo', 'timers'
]
# Vote values
VOTE_VALUES = ['yes', 'no', '1', '2']
# Boolean values (0/1)
BOOLEAN_VALUES = ['0', '1']
# sv_fps valid values
SV_FPS_VALUES = ['20', '30', '40', '60', '125']
# sv_maxclients common values
SV_MAXCLIENTS_VALUES = ['8', '12', '16', '24', '32']
# Common time limits (minutes)
TIMELIMIT_VALUES = ['10', '15', '20', '30']
# Common frag limits
FRAGLIMIT_VALUES = ['30', '50', '75', '100']
# Common capture limits
CAPTURELIMIT_VALUES = ['5', '8', '10']
# Common round limits
ROUNDLIMIT_VALUES = ['3', '5', '7', '10']
# Common teamsize values
TEAMSIZE_VALUES = ['2', '3', '4', '5', '6']
# Factory types for map command
FACTORY_TYPES = ['ffa', 'duel', 'tdm', 'ca', 'ctf', 'iffa', 'ictf', 'ift']
# Command signatures (usage help)
COMMAND_SIGNATURES = {
# Bot commands
'addbot': '<botname:Sarge|Ranger|Visor|etc> [skill 1-5] [team] [msec delay] [altname]',
'removebot': '<botname|altname>',
# Player management
'kick': '<player>',
'kickban': '<player>',
'ban': '<player>',
'unban': '<player>',
'tempban': '<player> <seconds>',
'tell': '<player> <message>',
# Chat & voting
'say': '<message>',
'callvote': '<vote type> [args...]',
'vote': '<yes|no>',
# Map commands
'map': '<mapname>',
'map_restart': '',
# Server
'rcon': '<command>',
'killserver': '',
'quit': '',
'status': '',
# Game cvars with common values
'g_gametype': '<0=FFA 1=Duel 2=TDM 3=CA 4=CTF 5=OCTF 6=Harv 7=FT 8=Dom>',
'timelimit': '<minutes>',
'fraglimit': '<frags>',
'capturelimit': '<captures>',
'roundlimit': '<rounds>',
'scorelimit': '<score>',
# Server settings
'sv_maxclients': '<1-64>',
'sv_hostname': '<name>',
'sv_fps': '<20|30|40|60|125>',
# Network
'net_port': '<port number>',
# ZMQ
'zmq_rcon_enable': '<0|1>',
'zmq_rcon_port': '<port>',
'zmq_rcon_password': '<password>',
'zmq_stats_enable': '<0|1>',
'zmq_stats_port': '<port>',
'zmq_stats_password': '<password>',
}
# Combine all cvars
ALL_CVARS = (
SERVER_CVARS +
GAME_CVARS +
MAP_CVARS +
VOTE_CVARS +
ITEM_CVARS +
NETWORK_CVARS +
ZMQ_CVARS +
QLX_CVARS +
BOT_CVARS +
COMMANDS
)
# Sort for binary search
ALL_CVARS.sort()
# Argument value mappings - maps argument type to list of valid values
ARGUMENT_VALUES = {
'botname': BOT_NAMES,
'skill': BOT_SKILL_LEVELS,
'team': TEAM_VALUES,
'mapname': MAP_NAMES,
'gametype': GAMETYPE_NAMES + GAMETYPE_NUMBERS,
'gametype_name': GAMETYPE_NAMES,
'vote_type': VOTE_TYPES,
'vote_value': VOTE_VALUES,
'boolean': BOOLEAN_VALUES,
'sv_fps': SV_FPS_VALUES,
'sv_maxclients': SV_MAXCLIENTS_VALUES,
'timelimit': TIMELIMIT_VALUES,
'fraglimit': FRAGLIMIT_VALUES,
'capturelimit': CAPTURELIMIT_VALUES,
'roundlimit': ROUNDLIMIT_VALUES,
'teamsize': TEAMSIZE_VALUES,
'factory': FACTORY_TYPES,
}
# Command argument definitions - maps command to list of argument types
# Each argument is a dict with: type (value list key), required (bool)
COMMAND_ARGUMENTS = {
'addbot': [
{'type': 'botname', 'required': True},
{'type': 'skill', 'required': False},
{'type': 'team', 'required': False},
{'type': 'msec delay number', 'required': False}, # msec delay
{'type': 'freetext', 'required': False}, # altname
],
'removebot': [
{'type': 'botname', 'required': True},
],
'kick': [
{'type': 'player', 'required': True},
],
'kickban': [
{'type': 'player', 'required': True},
],
'ban': [
{'type': 'player', 'required': True},
],
'tempban': [
{'type': 'player', 'required': True},
{'type': 'number', 'required': True}, # seconds
],
'tell': [
{'type': 'player', 'required': True},
{'type': 'freetext', 'required': True}, # message
],
'map': [
{'type': 'mapname', 'required': True},
{'type': 'factory', 'required': False},
],
'callvote': [
{'type': 'vote_type', 'required': True},
{'type': 'dynamic', 'required': False}, # Depends on vote type
],
'vote': [
{'type': 'vote_value', 'required': True},
],
'team': [
{'type': 'team', 'required': True},
],
'g_gametype': [
{'type': 'gametype', 'required': True},
],
'timelimit': [
{'type': 'timelimit', 'required': True},
],
'fraglimit': [
{'type': 'fraglimit', 'required': True},
],
'capturelimit': [
{'type': 'capturelimit', 'required': True},
],
'roundlimit': [
{'type': 'roundlimit', 'required': True},
],
'sv_fps': [
{'type': 'sv_fps', 'required': True},
],
'sv_maxclients': [
{'type': 'sv_maxclients', 'required': True},
],
'sv_hostname': [
{'type': 'freetext', 'required': True},
],
'sv_pure': [
{'type': 'boolean', 'required': True},
],
'zmq_rcon_enable': [
{'type': 'boolean', 'required': True},
],
'zmq_rcon_port': [
{'type': 'number', 'required': True},
],
'zmq_rcon_password': [
{'type': 'freetext', 'required': True},
],
'zmq_stats_enable': [
{'type': 'boolean', 'required': True},
],
'zmq_stats_port': [
{'type': 'number', 'required': True},
],
'zmq_stats_password': [
{'type': 'freetext', 'required': True},
],
}
def get_argument_suggestions(command, arg_position, current_value, player_list=None):
"""
Get autocomplete suggestions for a command argument
Args:
command: Command name (e.g., 'addbot')
arg_position: Argument index (0 = first argument after command)
current_value: What user has typed so far for this argument
player_list: List of player names (for dynamic 'player' type)
Returns:
List of suggestion strings matching current_value
"""
# Get command's argument definitions
if command not in COMMAND_ARGUMENTS:
return []
arg_defs = COMMAND_ARGUMENTS[command]
# Check if arg_position is valid
if arg_position >= len(arg_defs):
return []
arg_def = arg_defs[arg_position]
arg_type = arg_def['type']
# Handle special types
if arg_type == 'player':
# Dynamic: get from player_list parameter
if player_list:
return fuzzy_match(current_value, player_list, max_results=5)
return []
elif arg_type == 'dynamic':
# Special case for callvote second argument
# Would need first argument to determine suggestions
# For now, return empty (could be enhanced later)
return []
elif arg_type == 'number':
# Show common numeric values
return ['50', '100', '150', '200', '250']
elif arg_type == 'freetext':
# No suggestions for free text
return []
elif arg_type in ARGUMENT_VALUES:
# Static list from ARGUMENT_VALUES
values = ARGUMENT_VALUES[arg_type]
return fuzzy_match(current_value, values, max_results=5)
return []
def fuzzy_match(query, candidates, max_results=5):
"""
Fuzzy match query against candidates
Returns list of (match, score) tuples sorted by score
Scoring:
- Exact match: 1000
- Prefix match: 500 + remaining chars
- Substring match: 100
- Contains all chars in order: 50
- Levenshtein-like: based on edit distance
"""
if not query:
# Return first max_results candidates when query is empty
return candidates[:max_results]
query_lower = query.lower()
matches = []
for candidate in candidates:
candidate_lower = candidate.lower()
score = 0
# Exact match
if query_lower == candidate_lower:
score = 1000
# Prefix match (best after exact)
elif candidate_lower.startswith(query_lower):
score = 500 + (100 - len(candidate)) # Prefer shorter matches
# Substring match
elif query_lower in candidate_lower:
# Score higher if match is earlier in string
pos = candidate_lower.index(query_lower)
score = 100 - pos
# Contains all characters in order (fuzzy)
else:
query_idx = 0
for char in candidate_lower:
if query_idx < len(query_lower) and char == query_lower[query_idx]:
query_idx += 1
if query_idx == len(query_lower): # All chars found
score = 50
if score > 0:
matches.append((candidate, score))
# Sort by score (highest first), then alphabetically
matches.sort(key=lambda x: (-x[1], x[0]))
return [match for match, score in matches[:max_results]]
def autocomplete(partial, max_results=5):
"""
Autocomplete a partial cvar/command
Returns list of suggestions
"""
return fuzzy_match(partial, ALL_CVARS, max_results)
def parse_signature(signature):
"""
Parse command signature into individual arguments
Returns list of argument strings
Example:
'<botname> [skill 1-5] [team]' -> ['<botname>', '[skill 1-5]', '[team]']
"""
import re
# Match <arg> or [arg] patterns, including content with spaces
pattern = r'(<[^>]+>|\[[^\]]+\])'
args = re.findall(pattern, signature)
return args
def get_signature_with_highlight(command, arg_position):
"""
Get command signature with current argument highlighted
Args:
command: Command name (e.g., 'addbot')
arg_position: Current argument index (0-based, 0 = first arg after command)
Returns:
List of (text, is_highlighted) tuples
"""
if command not in COMMAND_SIGNATURES:
return []
signature = COMMAND_SIGNATURES[command]
if not signature:
return []
args = parse_signature(signature)
if not args:
return [(signature, False)]
# Build list of (arg_text, is_current) tuples
result = []
for i, arg in enumerate(args):
is_current = (i == arg_position)
result.append((arg, is_current))
return result
if __name__ == '__main__':
# Test autocomplete
print("Testing autocomplete:")
print(f"Total cvars/commands: {len(ALL_CVARS)}")
print()
test_queries = ['sv_', 'time', 'zmq', 'qlx', 'map', 'stat', 'g_team']
for query in test_queries:
results = autocomplete(query)
print(f"'{query}' -> {results}")

View File

@ -18,11 +18,11 @@ def get_team_prefix(player_name, player_tracker):
"""Get color-coded team prefix for a player"""
if not player_tracker.server_info.is_team_mode():
return ''
team = player_tracker.get_team(player_name)
if not team:
return ''
return TEAM_COLORS.get(team, '')
@ -34,33 +34,33 @@ def should_add_timestamp(message):
# But allow "zmq RCON" lines (command echoes)
if 'zmq RCON' not in message:
return False
# Skip very short messages or fragments
stripped = message.strip()
if len(stripped) <= 2:
return False
# Skip messages with leading spaces (status fragments)
if message.startswith(' ') and len(stripped) < 50:
return False
# Skip pure numbers
if stripped.isdigit():
return False
# Skip short single words
if len(stripped) < 20 and ' ' not in stripped:
return False
# Skip IP addresses (xxx.xxx.xxx.xxx or xxx.xxx.xxx.xxx:port)
allowed_chars = set('0123456789.:')
if stripped.count('.') == 3 and all(c in allowed_chars for c in stripped):
return False
# Skip messages starting with ***
if message.startswith('***'):
return False
return True
@ -74,13 +74,13 @@ def format_message(message, add_timestamp=True):
# Clean up message
message = message.replace("\\n", "")
message = message.replace(chr(25), "")
# Handle broadcast messages
attributes = 0
if message[:10] == "broadcast:":
message = message[11:]
attributes = 1 # Bold
# Handle print messages
if message[:7] == "print \"":
message = message[7:-2] + "\n"
@ -89,7 +89,7 @@ def format_message(message, add_timestamp=True):
if add_timestamp and should_add_timestamp(message):
timestamp = time.strftime('%H:%M:%S')
message = f"^3[^7{timestamp}^3]^7 {message}"
return message, attributes
@ -100,7 +100,7 @@ def format_chat_message(message, player_tracker):
"""
# Strip special character
clean_msg = message.replace(chr(25), '')
# Team chat with location: (PlayerName) (Location): message
# Location can have nested parens like (Lower Floor (Near Yellow Armour))
if clean_msg.strip().startswith('(') and ')' in clean_msg:
@ -108,10 +108,10 @@ def format_chat_message(message, player_tracker):
player_match = re.match(r'^(\([^)]+\))', clean_msg)
if not player_match:
return message
player_part = player_match.group(1)
rest = clean_msg[len(player_part):].lstrip()
# Check for location (another parenthetical)
if rest.startswith('('):
# Count parens to handle nesting
@ -125,12 +125,12 @@ def format_chat_message(message, player_tracker):
if paren_count == 0:
location_end = i + 1
break
# Check if location ends with colon
if location_end > 0 and location_end < len(rest) and rest[location_end] == ':':
location_part = rest[:location_end]
message_part = rest[location_end + 1:]
# Get team prefix
name_match = re.match(r'\(([^)]+)\)', player_part)
if name_match:
@ -138,30 +138,30 @@ def format_chat_message(message, player_tracker):
team_prefix = get_team_prefix(player_name, player_tracker)
location_clean = strip_color_codes(location_part)
return f"^8^5[TEAMSAY]^7^0 {team_prefix}^8{player_part}^0^3{location_clean}^7:^5{message_part}"
# Team chat without location: (PlayerName): message
colon_match = re.match(r'^(\([^)]+\)):(\s*.*)', clean_msg)
if colon_match:
player_part = colon_match.group(1)
message_part = colon_match.group(2)
name_match = re.match(r'\(([^)]+)\)', player_part)
if name_match:
player_name = strip_color_codes(name_match.group(1).strip())
team_prefix = get_team_prefix(player_name, player_tracker)
return f"^8^5[TEAMSAY]^7^0 {team_prefix}^8{player_part}^7^0:^5{message_part}\n"
# Regular chat: PlayerName: message
parts = clean_msg.split(':', 1)
if len(parts) == 2:
player_name = strip_color_codes(parts[0].strip())
team_prefix = get_team_prefix(player_name, player_tracker)
# Preserve original color-coded name
original_parts = message.replace(chr(25), '').split(':', 1)
if len(original_parts) == 2:
return f"^8^2[SAY]^7^0 {team_prefix}^8{original_parts[0]}^0^7:^2{original_parts[1]}"
return message
def format_powerup_message(message, player_tracker):

555
main.py
View File

@ -13,13 +13,26 @@ import curses
import zmq
import signal
import sys
import threading
from config import VERSION, DEFAULT_HOST, POLL_TIMEOUT
from config import VERSION, DEFAULT_HOST, POLL_TIMEOUT, QUIT_CONFIRM_TIMEOUT, RESPAWN_DELAY, MAX_COMMAND_HISTORY
from state import GameState
from network import RconConnection, StatsConnection
from parser import EventParser
from formatter import format_message, format_chat_message, format_powerup_message
from formatter import format_message, format_chat_message, format_powerup_message, strip_color_codes
from ui import UIManager
from qlpycon_config import ConfigLoader
# Pre-compiled regex patterns
CVAR_RESPONSE_PATTERN = re.compile(r'"([^"]+)"\s+is:"([^"]*)"')
PORT_PATTERN = re.compile(r'(\d+)')
BROADCAST_PATTERN = re.compile(r'^broadcast:\s*print\s*"(.+?)(?:\\n)?"\s*$')
TIMESTAMP_PATTERN = re.compile(r'\^\d\[\^\d[0-9:]+\^\d\]\^\d\s*|\[[0-9:]+\]\s*')
CONNECT_PATTERN = re.compile(r'^(.+?)\s+connected')
DISCONNECT_PATTERN = re.compile(r'^(.+?)\s+disconnected')
KICK_PATTERN = re.compile(r'^(.+?)\s+was kicked')
INACTIVITY_PATTERN = re.compile(r'^(.+?)\s+Dropped due to inactivity')
RENAME_PATTERN = re.compile(r'^(.+?)\s+renamed to\s+(.+?)$')
# Configure logging
logger = logging.getLogger('main')
@ -31,24 +44,25 @@ all_json_logger.setLevel(logging.DEBUG)
unknown_json_logger = logging.getLogger('unknown_json')
unknown_json_logger.setLevel(logging.DEBUG)
# Global flag for quit confirmation
# Global flag for quit confirmation (thread-safe)
quit_confirm_time = None
quit_confirm_lock = threading.Lock()
def signal_handler(sig, frame):
"""Handle Ctrl+C with confirmation"""
global quit_confirm_time
import time
current_time = time.time()
if quit_confirm_time is None or (current_time - quit_confirm_time) > 3:
# First Ctrl-C or timeout expired
logger.warning("^1^8Press Ctrl-C again within 3 seconds to quit^0")
quit_confirm_time = current_time
else:
# Second Ctrl-C within 3 seconds
logger.warning("^1^8Quittin'^0")
sys.exit(0)
with quit_confirm_lock:
if quit_confirm_time is None or (current_time - quit_confirm_time) > QUIT_CONFIRM_TIMEOUT:
# First Ctrl-C or timeout expired
logger.warning(f"^1^8Press Ctrl-C again within {QUIT_CONFIRM_TIMEOUT:.0f} seconds to quit^0")
quit_confirm_time = current_time
else:
# Second Ctrl-C within timeout
logger.warning("^1^8Quittin'^0")
sys.exit(0)
def parse_cvar_response(message, game_state, ui):
"""
@ -61,17 +75,17 @@ def parse_cvar_response(message, game_state, ui):
'mapname', 'timelimit', 'fraglimit', 'capturelimit', 'sv_maxclients']
if any(f': {cmd}' in message for cmd in suppress_cmds):
return True
# Parse cvar responses (format: "cvar_name" is:"value" default:...)
cvar_match = re.search(r'"([^"]+)"\s+is:"([^"]*)"', message)
cvar_match = CVAR_RESPONSE_PATTERN.search(message)
if cvar_match:
cvar_name = cvar_match.group(1)
value = cvar_match.group(2)
if game_state.server_info.update_from_cvar(cvar_name, value):
ui.update_server_info(game_state)
return True
return False
@ -82,28 +96,88 @@ def handle_stats_connection(message, rcon, ui, game_state):
"""
stats_port = None
stats_password = None
# Extract stats port
if 'net_port' in message and ' is:' in message and '"net_port"' in message:
match = re.search(r' is:"([^"]+)"', message)
match = CVAR_RESPONSE_PATTERN.search(message)
if match:
port_str = match.group(1).strip()
digit_match = re.search(r'(\d+)', port_str)
port_str = match.group(2).strip()
digit_match = PORT_PATTERN.search(port_str)
if digit_match:
stats_port = digit_match.group(1)
logger.info(f'Got stats port: {stats_port}')
# Extract stats password
if 'zmq_stats_password' in message and ' is:' in message and '"zmq_stats_password"' in message:
match = re.search(r' is:"([^"]+)"', message)
match = CVAR_RESPONSE_PATTERN.search(message)
if match:
password_str = match.group(1)
password_str = re.sub(r'\^\d', '', password_str) # Strip color codes
password_str = match.group(2)
password_str = strip_color_codes(password_str)
stats_password = password_str.strip()
logger.info(f'Got stats password: {stats_password}')
return stats_port, stats_password
def handle_user_input(input_queue, rcon, ui):
"""Process user command input"""
while not input_queue.empty():
command = input_queue.get()
logger.info(f'Sending command: {repr(command.strip())}')
# Display command with timestamp
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^5[^7{timestamp}^5] >>> {command.strip()}^7\n")
rcon.send_command(command)
def handle_stats_stream(stats_conn, stats_check_counter, event_parser, ui, game_state):
"""Poll and process stats stream events"""
if not stats_conn or not stats_conn.connected:
return stats_check_counter
stats_check_counter += 1
if stats_check_counter % 100 == 0:
logger.debug(f'Stats polling active (check #{stats_check_counter // 100})')
stats_msg = stats_conn.recv_message()
if stats_msg:
logger.info(f'Stats event received ({len(stats_msg)} bytes)')
# Parse game event
parsed = event_parser.parse_event(stats_msg)
if parsed:
# Format with timestamp before displaying
formatted_msg, attributes = format_message(parsed)
ui.print_message(formatted_msg)
ui.update_server_info(game_state)
return stats_check_counter
def handle_player_respawns(game_state, ui):
"""Check and revive dead players after respawn delay"""
if game_state.server_info.gametype == 'Clan Arena':
# CA: revive all players after round end
if game_state.server_info.round_end_time:
if time.time() - game_state.server_info.round_end_time >= RESPAWN_DELAY:
game_state.server_info.dead_players.clear()
game_state.server_info.round_end_time = None
ui.update_server_info(game_state)
else:
# Other modes: revive individual players after death
current_time = time.time()
players_to_revive = [
name for name, death_time in game_state.server_info.dead_players.items()
if current_time - death_time >= RESPAWN_DELAY
]
if players_to_revive:
for name in players_to_revive:
del game_state.server_info.dead_players[name]
ui.update_server_info(game_state)
def parse_player_events(message, game_state, ui):
"""
Parse connect, disconnect, kick, and rename messages
@ -113,13 +187,12 @@ def parse_player_events(message, game_state, ui):
msg = message
# Strip broadcast: print "..." wrapper with regex
broadcast_match = re.match(r'^broadcast:\s*print\s*"(.+?)(?:\\n)?"\s*$', msg)
broadcast_match = BROADCAST_PATTERN.match(msg)
if broadcast_match:
msg = broadcast_match.group(1)
# Strip timestamp: [HH:MM:SS] or ^3[^7HH:MM:SS^3]^7
msg = re.sub(r'\^\d\[\^\d[0-9:]+\^\d\]\^\d\s*', '', msg)
msg = re.sub(r'\[[0-9:]+\]\s*', '', msg)
msg = TIMESTAMP_PATTERN.sub('', msg)
msg = msg.strip()
if not msg:
@ -128,81 +201,77 @@ def parse_player_events(message, game_state, ui):
logger.debug(f'parse_player_events: {repr(msg)}')
# Strip color codes for matching
from formatter import strip_color_codes
clean_msg = strip_color_codes(msg)
# Match connects: "NAME connected" or "NAME connected with Steam ID"
connect_match = re.match(r'^(.+?)\s+connected', clean_msg)
connect_match = CONNECT_PATTERN.match(clean_msg)
if connect_match:
player_name_match = re.match(r'^(.+?)\s+connected', msg)
player_name_match = CONNECT_PATTERN.match(msg)
player_name = player_name_match.group(1).strip() if player_name_match else connect_match.group(1).strip()
player_name = re.sub(r'\^\d+$', '', player_name)
player_name = strip_color_codes(player_name)
logger.info(f'CONNECT: {repr(player_name)}')
game_state.player_tracker.update_team(player_name, 'SPECTATOR')
game_state.player_tracker.add_player(player_name)
ui.update_server_info(game_state)
# Only print if this is NOT the Steam ID line
if 'Steam ID' not in message:
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^3[^7{timestamp}^3]^7 ^8{player_name} ^9^2connected\n")
return True
# Regular disconnect
disconnect_match = re.match(r'^(.+?)\s+disconnected', clean_msg)
disconnect_match = DISCONNECT_PATTERN.match(clean_msg)
if disconnect_match:
original_match = re.match(r'^(.+?)\s+disconnected', msg)
original_match = DISCONNECT_PATTERN.match(msg)
player_name = original_match.group(1).strip() if original_match else disconnect_match.group(1).strip()
player_name = re.sub(r'\^\d+$', '', player_name)
player_name = re.sub(r'^\^\d+', '', player_name)
player_name = strip_color_codes(player_name)
logger.info(f'DISCONNECT: {repr(player_name)}')
game_state.player_tracker.remove_player(player_name)
ui.update_server_info(game_state)
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^3[^7{timestamp}^3]^7 ^8{player_name} ^9^1disconnected\n")
return True
# Kick
kick_match = re.match(r'^(.+?)\s+was kicked', clean_msg)
kick_match = KICK_PATTERN.match(clean_msg)
if kick_match:
original_match = re.match(r'^(.+?)\s+was kicked', msg)
original_match = KICK_PATTERN.match(msg)
player_name = original_match.group(1).strip() if original_match else kick_match.group(1).strip()
player_name = re.sub(r'\^\d+$', '', player_name)
player_name = re.sub(r'^\^\d+', '', player_name)
player_name = strip_color_codes(player_name)
logger.info(f'KICK: {repr(player_name)}')
game_state.player_tracker.remove_player(player_name)
ui.update_server_info(game_state)
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^3[^7{timestamp}^3]^7 ^8{player_name} ^1was kicked\n")
return True
# Inactivity
inactivity_match = re.match(r'^(.+?)\s+Dropped due to inactivity', clean_msg)
inactivity_match = INACTIVITY_PATTERN.match(clean_msg)
if inactivity_match:
original_match = re.match(r'^(.+?)\s+Dropped due to inactivity', msg)
original_match = INACTIVITY_PATTERN.match(msg)
player_name = original_match.group(1).strip() if original_match else inactivity_match.group(1).strip()
player_name = re.sub(r'\^\d+$', '', player_name)
player_name = re.sub(r'^\^\d+', '', player_name)
player_name = strip_color_codes(player_name)
logger.info(f'INACTIVITY DROP: {repr(player_name)}')
game_state.player_tracker.remove_player(player_name)
ui.update_server_info(game_state)
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^3[^7{timestamp}^3]^7 ^8{player_name}^0 ^3dropped due to inactivity^7\n")
return True
# Match renames: "OldName renamed to NewName"
rename_match = re.match(r'^(.+?)\s+renamed to\s+(.+?)$', clean_msg)
rename_match = RENAME_PATTERN.match(clean_msg)
if rename_match:
# Extract from original message
original_match = re.match(r'^(.+?)\s+renamed to\s+(.+?)$', msg)
original_match = RENAME_PATTERN.match(msg)
if original_match:
old_name = original_match.group(1).strip()
new_name = original_match.group(2).strip()
@ -210,10 +279,9 @@ def parse_player_events(message, game_state, ui):
old_name = rename_match.group(1).strip()
new_name = rename_match.group(2).strip()
# Remove trailing color codes from both names
old_name = re.sub(r'\^\d+$', '', old_name)
new_name = re.sub(r'^\^\d+', '', new_name) # Remove leading ^7 from new name
new_name = re.sub(r'\^\d+$', '', new_name) # Remove trailing too
# Remove color codes from both names
old_name = strip_color_codes(old_name)
new_name = strip_color_codes(new_name)
old_name = old_name.rstrip('\n\r') # Remove trailing newline
new_name = new_name.rstrip('\n\r') # Remove trailing newline
@ -234,17 +302,27 @@ def main_loop(screen):
# Setup signal handler for Ctrl+C with confirmation
signal.signal(signal.SIGINT, signal_handler)
# Parse arguments
# Load configuration file (optional)
config = ConfigLoader()
config.load()
# Parse arguments (command line overrides config file)
parser = argparse.ArgumentParser(description='Verbose QuakeLive server statistics')
parser.add_argument('--host', default=DEFAULT_HOST, help=f'ZMQ URI to connect to. Defaults to {DEFAULT_HOST}')
parser.add_argument('--password', required=False, help='RCON password')
parser.add_argument('--identity', default=uuid.uuid1().hex, help='Socket identity (random UUID by default)')
parser.add_argument('-v', '--verbose', action='count', default=0, help='Increase verbosity (-v INFO, -vv DEBUG)')
parser.add_argument('--unknown-log', default='unknown_events.log', help='File to log unknown JSON events')
parser.add_argument('-j', '--json', dest='json_log', default=None, help='File to log all JSON events')
parser.add_argument('--host', default=config.get_host() or DEFAULT_HOST,
help=f'ZMQ URI to connect to. Defaults to {DEFAULT_HOST}')
parser.add_argument('--password', default=config.get_password(), required=False,
help='RCON password')
parser.add_argument('--identity', default=uuid.uuid1().hex,
help='Socket identity (random UUID by default)')
parser.add_argument('-v', '--verbose', action='count', default=0,
help='Increase verbosity (-v INFO, -vv DEBUG)')
parser.add_argument('--unknown-log', default='unknown_events.log',
help='File to log unknown JSON events')
parser.add_argument('-j', '--json', dest='json_log', default=None,
help='File to log all JSON events')
args = parser.parse_args()
# Set logging level
if args.verbose == 0:
logger.setLevel(logging.WARNING)
@ -252,41 +330,41 @@ def main_loop(screen):
logger.setLevel(logging.INFO)
else:
logger.setLevel(logging.DEBUG)
# Setup file logging for unknown events
unknown_handler = logging.FileHandler(args.unknown_log, mode='a')
unknown_formatter = logging.Formatter('%(asctime)s - %(message)s', '%Y-%m-%d %H:%M:%S')
unknown_handler.setFormatter(unknown_formatter)
unknown_json_logger.addHandler(unknown_handler)
unknown_json_logger.propagate = False
# Initialize components
ui = UIManager(screen, args.host)
game_state = GameState()
# Setup logging to output window
log_handler = ui.setup_logging()
logger.addHandler(log_handler)
# Setup input queue
input_queue = ui.setup_input_queue()
# Display startup messages
ui.print_message(f"*** QL pyCon Version {VERSION} starting ***\n")
ui.print_message(f"zmq python bindings {zmq.__version__}, libzmq version {zmq.zmq_version()}\n")
# Initialize network connections
rcon = RconConnection(args.host, args.password, args.identity)
rcon.connect()
stats_conn = None
stats_port = None
stats_password = None
stats_check_counter = 0
# Shutdown flag
shutdown = False
# Setup JSON logging if requested
json_logger = None
if args.json_log:
@ -296,145 +374,66 @@ def main_loop(screen):
all_json_logger.addHandler(json_handler)
all_json_logger.propagate = False
json_logger = all_json_logger
# Create event parser
event_parser = EventParser(game_state, json_logger, unknown_json_logger)
# Main event loop
while not shutdown:
# Poll RCON socket
event = rcon.poll(POLL_TIMEOUT)
# Check monitor for connection events
monitor_event = rcon.check_monitor()
if monitor_event and monitor_event[0] == zmq.EVENT_CONNECTED:
ui.print_message("Connected to server\n")
rcon.send_command(b'register')
logger.info('Registration message sent')
ui.print_message("Requesting connection info...\n")
rcon.send_command(b'zmq_stats_password')
rcon.send_command(b'net_port')
# Handle user input
while not input_queue.empty():
command = input_queue.get()
logger.info(f'Sending command: {repr(command.strip())}')
# Display command with timestamp
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^5[^7{timestamp}^5] >>> {command.strip()}^7\n")
rcon.send_command(command)
# Poll stats stream if connected
if stats_conn and stats_conn.connected:
stats_check_counter += 1
if stats_check_counter % 100 == 0:
logger.debug(f'Stats polling active (check #{stats_check_counter // 100})')
stats_msg = stats_conn.recv_message()
if stats_msg:
logger.info(f'Stats event received ({len(stats_msg)} bytes)')
# Parse game event
parsed = event_parser.parse_event(stats_msg)
if parsed:
# Format with timestamp before displaying
formatted_msg, attributes = format_message(parsed)
ui.print_message(formatted_msg)
ui.update_server_info(game_state)
# Check if we need to revive players
if game_state.server_info.gametype == 'Clan Arena':
# CA: revive all players 3s after round end
if game_state.server_info.round_end_time:
if time.time() - game_state.server_info.round_end_time >= 3.0:
game_state.server_info.dead_players.clear()
game_state.server_info.round_end_time = None
ui.update_server_info(game_state)
else:
# Other modes: revive individual players 3s after death
current_time = time.time()
players_to_revive = [
name for name, death_time in game_state.server_info.dead_players.items()
if current_time - death_time >= 3.0
]
if players_to_revive:
for name in players_to_revive:
del game_state.server_info.dead_players[name]
ui.update_server_info(game_state)
# Process RCON messages
if event > 0:
logger.debug('Socket has data available')
msg_count = 0
while True:
message = rcon.recv_message()
if message is None:
if msg_count > 0:
logger.debug(f'Read {msg_count} message(s)')
break
msg_count += 1
if len(message) == 0:
logger.debug('Received empty message (keepalive)')
continue
logger.debug(f'Received message ({len(message)} bytes): {repr(message[:100])}')
# Check for player connect/disconnect/rename events
if parse_player_events(message, game_state, ui):
continue
if '------- Game Initialization -------' in message or 'Game Initialization' in message:
logger.info('Game initialization detected - refreshing server info')
# Main event loop with resource cleanup
try:
while not shutdown:
# Poll RCON socket
event = rcon.poll(POLL_TIMEOUT)
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^3[^7{timestamp}^3] ^8^3Game initialized - Refreshing server info^0^7\n")
# Check monitor for connection events
monitor_event = rcon.check_monitor()
if monitor_event and monitor_event[0] == zmq.EVENT_CONNECTED:
ui.print_message("Connected to server\n")
rcon.send_command(b'register')
logger.info('Registration message sent')
rcon.send_command(b'qlx_serverBrandName')
rcon.send_command(b'g_factoryTitle')
rcon.send_command(b'mapname')
rcon.send_command(b'timelimit')
rcon.send_command(b'fraglimit')
rcon.send_command(b'roundlimit')
rcon.send_command(b'capturelimit')
rcon.send_command(b'sv_maxclients')
ui.print_message("Requesting connection info...\n")
rcon.send_command(b'zmq_stats_password')
rcon.send_command(b'net_port')
# Clear player list since map changed
game_state.server_info.players = []
game_state.player_tracker.player_teams = {}
ui.update_server_info(game_state)
# Handle user input
handle_user_input(input_queue, rcon, ui)
# Poll stats stream if connected
stats_check_counter = handle_stats_stream(stats_conn, stats_check_counter, event_parser, ui, game_state)
# Check if we need to revive players
handle_player_respawns(game_state, ui)
# Process RCON messages
if event > 0:
logger.debug('Socket has data available')
msg_count = 0
while True:
message = rcon.recv_message()
if message is None:
if msg_count > 0:
logger.debug(f'Read {msg_count} message(s)')
break
msg_count += 1
if len(message) == 0:
logger.debug('Received empty message (keepalive)')
continue
logger.debug(f'Received message ({len(message)} bytes): {repr(message[:100])}')
# Check for player connect/disconnect/rename events
if parse_player_events(message, game_state, ui):
continue
if '------- Game Initialization -------' in message or 'Game Initialization' in message:
logger.info('Game initialization detected - refreshing server info')
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^3[^7{timestamp}^3] ^8^3Game initialized - Refreshing server info^0^7\n")
# Try to parse as cvar response
if parse_cvar_response(message, game_state, ui):
logger.debug('Suppressed cvar response')
continue
# Check for stats connection info
port, password = handle_stats_connection(message, rcon, ui, game_state)
if port:
stats_port = port
if password:
stats_password = password
# Connect to stats if we have both credentials
if stats_port and stats_password and stats_conn is None:
try:
ui.print_message("Connecting to stats stream...\n")
host_ip = args.host.split('//')[1].split(':')[0]
stats_conn = StatsConnection(host_ip, stats_port, stats_password)
stats_conn.connect()
ui.print_message("Stats stream connected - ready for game events\n")
# Request initial server info
logger.info('Sending initial server info queries')
rcon.send_command(b'qlx_serverBrandName')
rcon.send_command(b'g_factoryTitle')
rcon.send_command(b'mapname')
@ -443,47 +442,97 @@ def main_loop(screen):
rcon.send_command(b'roundlimit')
rcon.send_command(b'capturelimit')
rcon.send_command(b'sv_maxclients')
if args.json_log:
ui.print_message(f"*** JSON capture enabled: {args.json_log} ***\n")
except Exception as e:
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^1[^7{timestamp}^1] Error: Stats connection failed: {e}^7\n")
logger.error(f'Stats connection failed: {e}')
# Try to parse as game event
parsed_event = event_parser.parse_event(message)
if parsed_event:
ui.print_message(parsed_event)
continue
# Check if it looks like JSON but wasn't parsed
stripped = message.strip()
if stripped and stripped[0] in ('{', '['):
logger.debug('Unparsed JSON event')
continue
# Try powerup message formatting
powerup_msg = format_powerup_message(message, game_state.player_tracker)
if powerup_msg:
ui.print_message(powerup_msg)
continue
# Filter bot debug messages in default mode
is_bot_debug = (' entered ' in message and
any(x in message for x in [' seek ', ' battle ', ' chase', ' fight']))
if is_bot_debug and args.verbose == 0:
logger.debug(f'Filtered bot debug: {message[:50]}')
continue
# Check if it's a chat message
if ':' in message and not message.startswith(('print', 'broadcast', 'zmq')):
message = format_chat_message(message, game_state.player_tracker)
# Format and display message
formatted_msg, attributes = format_message(message)
ui.print_message(formatted_msg)
# Clear player dict since map changed
game_state.server_info.players = {}
game_state.player_tracker.player_teams = {}
ui.update_server_info(game_state)
# Try to parse as cvar response
if parse_cvar_response(message, game_state, ui):
logger.debug('Suppressed cvar response')
continue
# Check for stats connection info
port, password = handle_stats_connection(message, rcon, ui, game_state)
if port:
stats_port = port
if password:
stats_password = password
# Connect to stats if we have both credentials
if stats_port and stats_password and stats_conn is None:
try:
ui.print_message("Connecting to stats stream...\n")
host_ip = args.host.split('//')[1].split(':')[0]
stats_conn = StatsConnection(host_ip, stats_port, stats_password)
stats_conn.connect()
ui.print_message("Stats stream connected - ready for game events\n")
# Request initial server info
logger.info('Sending initial server info queries')
rcon.send_command(b'qlx_serverBrandName')
rcon.send_command(b'g_factoryTitle')
rcon.send_command(b'mapname')
rcon.send_command(b'timelimit')
rcon.send_command(b'fraglimit')
rcon.send_command(b'roundlimit')
rcon.send_command(b'capturelimit')
rcon.send_command(b'sv_maxclients')
if args.json_log:
ui.print_message(f"*** JSON capture enabled: {args.json_log} ***\n")
except Exception as e:
timestamp = time.strftime('%H:%M:%S')
ui.print_message(f"^1[^7{timestamp}^1] Error: Stats connection failed: {e}^7\n")
logger.error(f'Stats connection failed: {e}')
# Try to parse as game event
parsed_event = event_parser.parse_event(message)
if parsed_event:
ui.print_message(parsed_event)
continue
# Check if it looks like JSON but wasn't parsed
stripped = message.strip()
if stripped and stripped[0] in ('{', '['):
logger.debug('Unparsed JSON event')
continue
# Try powerup message formatting
powerup_msg = format_powerup_message(message, game_state.player_tracker)
if powerup_msg:
ui.print_message(powerup_msg)
continue
# Filter bot debug messages in default mode
is_bot_debug = (' entered ' in message and
any(x in message for x in [' seek ', ' battle ', ' chase', ' fight']))
if is_bot_debug and args.verbose == 0:
logger.debug(f'Filtered bot debug: {message[:50]}')
continue
# Check if it's a chat message
if ':' in message and not message.startswith(('print', 'broadcast', 'zmq')):
message = format_chat_message(message, game_state.player_tracker)
# Format and display message
formatted_msg, attributes = format_message(message)
ui.print_message(formatted_msg)
finally:
# Clean up resources
logger.info("Shutting down...")
if rcon:
logger.debug("Closing RCON connection")
rcon.close()
if stats_conn:
logger.debug("Closing stats connection")
stats_conn.close()
logger.info("Shutdown complete")
if __name__ == '__main__':
curses.wrapper(main_loop)

View File

@ -39,7 +39,7 @@ def check_monitor(monitor):
event_monitor = monitor.recv(zmq.NOBLOCK)
except zmq.Again:
return None
event_id, event_name, event_value = read_socket_event(event_monitor)
event_endpoint = monitor.recv(zmq.NOBLOCK)
logger.debug(f'Monitor: {event_name} {event_value} endpoint {event_endpoint}')
@ -48,7 +48,7 @@ def check_monitor(monitor):
class RconConnection:
"""RCON connection to Quake Live server"""
def __init__(self, host, password, identity):
self.host = host
self.password = password
@ -56,52 +56,52 @@ class RconConnection:
self.context = None
self.socket = None
self.monitor = None
def connect(self):
"""Initialize connection"""
logger.info('Initializing ZMQ context...')
self.context = zmq.Context()
logger.info('Creating DEALER socket...')
self.socket = self.context.socket(zmq.DEALER)
logger.info('Setting up socket monitor...')
self.monitor = self.socket.get_monitor_socket(zmq.EVENT_ALL)
if self.password:
logger.info('Setting password for access')
self.socket.plain_username = b'rcon'
self.socket.plain_password = self.password.encode('utf-8')
self.socket.zap_domain = b'rcon'
logger.info(f'Setting socket identity: {self.identity}')
self.socket.setsockopt(zmq.IDENTITY, self.identity.encode('utf-8'))
self.socket.connect(self.host)
logger.info('Connection initiated, waiting for events...')
def send_command(self, command):
"""Send RCON command"""
if isinstance(command, str):
command = command.encode('utf-8')
self.socket.send(command)
logger.info(f'Sent command: {command}')
def poll(self, timeout):
"""Poll for messages"""
return self.socket.poll(timeout)
def recv_message(self):
"""Receive a message (non-blocking)"""
try:
return self.socket.recv(zmq.NOBLOCK).decode('utf-8', errors='replace')
except zmq.error.Again:
return None
def check_monitor(self):
"""Check monitor for events"""
return check_monitor(self.monitor)
def close(self):
"""Close connection"""
if self.socket:
@ -113,7 +113,7 @@ class RconConnection:
class StatsConnection:
"""Stats stream connection (ZMQ SUB socket)"""
def __init__(self, host, port, password):
self.host = host
self.port = port
@ -121,43 +121,43 @@ class StatsConnection:
self.context = None
self.socket = None
self.connected = False
def connect(self):
"""Connect to stats stream"""
stats_host = f'tcp://{self.host}:{self.port}'
logger.info(f'Connecting to stats stream: {stats_host}')
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
logger.debug('Stats socket created (SUB type)')
if self.password and self.password.strip():
logger.debug('Setting PLAIN authentication')
self.socket.setsockopt(zmq.PLAIN_USERNAME, b'stats')
self.socket.setsockopt(zmq.PLAIN_PASSWORD, self.password.encode('utf-8'))
self.socket.setsockopt_string(zmq.ZAP_DOMAIN, 'stats')
logger.debug(f'Connecting to {stats_host}')
self.socket.connect(stats_host)
logger.debug('Setting ZMQ_SUBSCRIBE to empty (all messages)')
self.socket.setsockopt(zmq.SUBSCRIBE, b'')
time.sleep(0.5)
self.connected = True
logger.info('Stats stream connected')
def recv_message(self):
"""Receive stats message (non-blocking)"""
if not self.connected:
return None
try:
msg = self.socket.recv(zmq.NOBLOCK)
return msg.decode('utf-8', errors='replace')
except zmq.error.Again:
return None
def close(self):
"""Close connection"""
if self.socket:

112
parser.py
View File

@ -25,12 +25,12 @@ def calculate_weapon_accuracies(weapon_data):
class EventParser:
"""Parses JSON game events into formatted messages"""
def __init__(self, game_state, json_logger=None, unknown_logger=None):
self.game_state = game_state
self.json_logger = json_logger
self.unknown_logger = unknown_logger
def parse_event(self, message):
"""
Parse JSON event and return formatted message string
@ -38,23 +38,23 @@ class EventParser:
"""
try:
event = json.loads(message)
# Log all JSON if logger is configured
if self.json_logger:
self.json_logger.info('JSON Event received:')
self.json_logger.info(json.dumps(event, indent=2))
self.json_logger.info('---')
if 'TYPE' not in event or 'DATA' not in event:
logger.debug('JSON missing TYPE or DATA')
return None
event_type = event['TYPE']
data = event['DATA']
if 'WARMUP' in data:
self.game_state.server_info.warmup = data['WARMUP']
# Route to appropriate handler
handler_map = {
'PLAYER_SWITCHTEAM': self._handle_switchteam,
@ -68,7 +68,7 @@ class EventParser:
'PLAYER_DISCONNECT': lambda d: None,
'ROUND_OVER': self._handle_round_over,
}
handler = handler_map.get(event_type)
if handler:
return handler(data)
@ -79,14 +79,14 @@ class EventParser:
self.unknown_logger.info(f'Unknown event type: {event_type}')
self.unknown_logger.info(f'Full JSON: {json.dumps(event, indent=2)}')
return None
except json.JSONDecodeError as e:
logger.debug(f'JSON decode error: {e}')
return None
except (KeyError, TypeError) as e:
logger.debug(f'Error parsing event: {e}')
return None
def _handle_switchteam(self, data):
"""Handle PLAYER_SWITCHTEAM event"""
@ -96,41 +96,41 @@ class EventParser:
if 'KILLER' not in data:
return None
killer = data['KILLER']
name = killer.get('NAME', 'Unknown')
team = killer.get('TEAM', '')
old_team = killer.get('OLD_TEAM', '')
# Update player team
self.game_state.player_tracker.update_team(name, team)
self.game_state.player_tracker.add_player(name)
if team == old_team:
return None
warmup = " ^8^3(Warmup)^0" if data.get('WARMUP', False) else ""
team_messages = {
'FREE': ' ^7joined the ^8fight^0',
'SPECTATOR': ' ^7joined the ^3Spectators^7',
'RED': ' ^7joined the ^1RED Team^7',
'BLUE': ' ^7joined the ^4BLUE Team^7'
}
old_team_messages = {
'FREE': 'the ^8fight^0',
'SPECTATOR': 'the ^3Spectators^7',
'RED': '^7the ^1RED Team^7',
'BLUE': '^7the ^4BLUE Team^7'
}
team_msg = team_messages.get(team, f' ^7joined team {team}^7')
old_team_msg = old_team_messages.get(old_team, f'team {old_team}')
team_prefix = get_team_prefix(name, self.game_state.player_tracker)
return f"^8^5[SWITCH]^7 {team_prefix}^8{name}^0{team_msg} from {old_team_msg}{warmup}\n"
def _handle_death(self, data):
"""Handle PLAYER_DEATH and PLAYER_KILL events"""
@ -140,17 +140,17 @@ class EventParser:
if 'VICTIM' not in data:
return None
victim = data['VICTIM']
victim_name = victim.get('NAME', 'Unknown')
# Check for duplicate
time_val = data.get('TIME', 0)
killer_name = data.get('KILLER', {}).get('NAME', '') if data.get('KILLER') else ''
if self.game_state.event_deduplicator.is_duplicate('PLAYER_DEATH', time_val, killer_name, victim_name):
return None
# Update victim team
if 'TEAM' in victim:
self.game_state.player_tracker.update_team(victim_name, victim['TEAM'])
@ -160,46 +160,46 @@ class EventParser:
if not data.get('WARMUP', False):
import time
self.game_state.server_info.dead_players[victim_name] = time.time()
victim_prefix = get_team_prefix(victim_name, self.game_state.player_tracker)
warmup = " ^8^3(Warmup)^0" if data.get('WARMUP', False) else ""
score_prefix = ""
# Environmental death (no killer)
if 'KILLER' not in data or not data['KILLER']:
# -1 for environmental death
if not data.get('WARMUP', False):
self.game_state.player_tracker.update_score(victim_name, -1)
score_prefix = "^8^1[-1]^7^0 "
mod = data.get('MOD', 'UNKNOWN')
msg_template = DEATH_MESSAGES.get(mod, "%s^8%s^0 ^1DIED FROM %s^7")
if mod in DEATH_MESSAGES:
msg = msg_template % (victim_prefix, victim_name)
else:
msg = msg_template % (victim_prefix, victim_name, mod)
return f"{score_prefix}{msg}{warmup}\n"
# Player killed by another player
killer = data['KILLER']
killer_name = killer.get('NAME', 'Unknown')
# Update killer team
if 'TEAM' in killer:
self.game_state.player_tracker.update_team(killer_name, killer['TEAM'])
self.game_state.player_tracker.add_player(killer_name)
killer_prefix = get_team_prefix(killer_name, self.game_state.player_tracker)
# Suicide
if killer_name == victim_name:
# -1 for suicide
if not data.get('WARMUP', False):
self.game_state.player_tracker.update_score(victim_name, -1)
score_prefix = "^8^1[-1]^7^0 "
weapon = killer.get('WEAPON', 'OTHER_WEAPON')
if weapon == 'ROCKET':
return f"{score_prefix}{killer_prefix}^8{killer_name}^0 ^7blew herself up.{warmup}\n"
@ -211,7 +211,7 @@ class EventParser:
weapon_name = WEAPON_NAMES.get(weapon, weapon)
return f"{score_prefix}{killer_prefix}^8{killer_name}^0 ^7committed suicide with the ^7{weapon_name}{warmup}\n"
return None
# Regular kill: +1 for killer
if not data.get('WARMUP', False):
self.game_state.player_tracker.update_score(killer_name, 1)
@ -219,13 +219,13 @@ class EventParser:
else:
score_prefix = ""
weapon = killer.get('WEAPON', 'UNKNOWN')
weapon_name = WEAPON_KILL_NAMES.get(weapon, f'the {weapon}')
hp_left = killer.get('HEALTH', '0 HP')
hp_left_colored = ""
if hp_left <= 0: # from the grave
hp_left = int(killer.get('HEALTH', 0))
hp_left_colored = ""
if hp_left <= 0: # from the grave
hp_left_colored = f"^8^5From the Grave^0"
elif hp_left < 25: # red
hp_left_colored = f"^8^1{hp_left}^0 ^7HP"
@ -235,7 +235,7 @@ class EventParser:
hp_left_colored = f"^8^7{hp_left}^0 ^7HP"
else: # green
hp_left_colored = f"^8^2{hp_left}^0 ^7HP"
return f"{score_prefix}{killer_prefix}^8{killer_name}^0 ^7fragged^7 {victim_prefix}^8{victim_name}^0 ^7with {weapon_name}^0 ^7({hp_left_colored}^7){warmup}\n"
def _handle_round_over(self, data):
@ -247,19 +247,19 @@ class EventParser:
team_won = data.get('TEAM_WON')
round_num = data.get('ROUND', 0)
if team_won == 'RED':
self.game_state.server_info.red_rounds += 1
logger.info(f"Round {round_num}: RED wins (RED: {self.game_state.server_info.red_rounds}, BLUE: {self.game_state.server_info.blue_rounds})")
elif team_won == 'BLUE':
self.game_state.server_info.blue_rounds += 1
logger.info(f"Round {round_num}: BLUE wins (RED: {self.game_state.server_info.red_rounds}, BLUE: {self.game_state.server_info.blue_rounds})")
import time
self.game_state.server_info.round_end_time = time.time()
return None # Don't display in chat
def _handle_medal(self, data):
"""Handle PLAYER_MEDAL event"""
@ -271,7 +271,7 @@ class EventParser:
medal = data.get('MEDAL', 'UNKNOWN')
warmup = " ^8^3(Warmup)^7^0" if data.get('WARMUP', False) else ""
medal_prefix = "^8^6[MEDAL]^7^0 "
team_prefix = get_team_prefix(name, self.game_state.player_tracker)
# RED Medals (^1)
if medal in ["FIRSTFRAG", "HUMILIATION", "REVENGE"]:
@ -293,7 +293,7 @@ class EventParser:
return f"{medal_prefix}{team_prefix}^8{name}^0 ^7got ^8^6{medal}^0{warmup}\n"
else:
return f"{medal_prefix}{team_prefix}^8{name}^0 ^7got ^8^7{medal}^0{warmup}\n"
def _handle_match_started(self, data):
"""Handle MATCH_STARTED event"""
@ -303,18 +303,18 @@ class EventParser:
if self.game_state.server_info.is_team_mode():
return f"^8^2[GAME ON]^0 ^7Match has started - ^1^8RED ^0^7vs. ^4^8BLUE\n"
players = []
for player in data.get('PLAYERS', []):
name = player.get('NAME', 'Unknown')
players.append(name)
if players:
formatted = "^0 vs. ^8".join(players)
return f"^8^2[GAME ON]^0 ^7Match has started - ^8^7{formatted}\n"
return None
def _handle_match_report(self, data):
"""Handle MATCH_REPORT event"""
@ -324,37 +324,37 @@ class EventParser:
if not self.game_state.server_info.is_team_mode():
return None
red_score = int(data.get('TSCORE0', '0'))
blue_score = int(data.get('TSCORE1', '0'))
report_prefix = "^8^1[GAME OVER]"
if red_score > blue_score:
return f"{report_prefix} ^7The ^1RED TEAM ^7WINS^0 by a score of ^8^1{red_score} ^0^7to ^8^4{blue_score}\n"
elif blue_score > red_score:
return f"{report_prefix} ^7The ^4BLUE TEAM ^7WINS^0 by a score of ^8^4{blue_score} ^0^7to ^8^1{red_score}\n"
else:
return f"{report_prefix} ^7The match is a TIE^0 with a score of ^8^1{red_score} ^0^7to ^8^4{blue_score}\n"
def _handle_player_stats(self, data):
"""Handle PLAYER_STATS event"""
name = data.get('NAME', 'Unknown')
team_prefix = get_team_prefix(name, self.game_state.player_tracker)
kills = int(data.get('KILLS', '0'))
deaths = int(data.get('DEATHS', '0'))
weapon_data = data.get('WEAPONS', {})
accuracies = calculate_weapon_accuracies(weapon_data)
if not accuracies:
return None
best_weapon = max(accuracies, key=accuracies.get)
best_accuracy = accuracies[best_weapon] * 100
weapon_stats = weapon_data.get(best_weapon, {})
best_weapon_kills = int(weapon_stats.get('K', 0))
weapon_name = WEAPON_NAMES.get(best_weapon, best_weapon)
return f"^8^5[PLAYER STATS]^7^0 {team_prefix}^8^7{name}^0^7 K/D: {kills}/{deaths} | Best Weapon: {weapon_name} - Acc: {best_accuracy:.2f}% - Kills: {best_weapon_kills}\n"

View File

@ -1,10 +1,20 @@
#/usr/bin/env bash
#!/usr/bin/env bash
#
# Helper script to connect to different Quake Live servers
# Set QLPYCON_PASSWORD environment variable or create ~/.qlpycon.conf
workdir="/home/xbl/gits/qlpycon"
password="$(cat $workdir/rconpw.txt)"
workdir="/home/xbl/gitz/qlpycon"
password="${QLPYCON_PASSWORD:-}"
serverip="10.13.12.93"
# Check if password is set
if [ -z "$password" ]; then
echo "Error: QLPYCON_PASSWORD environment variable not set"
echo "Set it with: export QLPYCON_PASSWORD='your_password'"
echo "Or create ~/.qlpycon.conf with [connection] section"
exit 1
fi
cd "$workdir"
source venv/bin/activate

147
qlpycon_config.py Normal file
View File

@ -0,0 +1,147 @@
#!/usr/bin/env python3
"""
Configuration file handler for QLPyCon
Supports loading from ~/.qlpycon.conf or ./qlpycon.conf
"""
import os
import configparser
import logging
logger = logging.getLogger('config_loader')
class ConfigLoader:
"""Load configuration from INI file"""
def __init__(self):
self.config = configparser.ConfigParser()
self.config_loaded = False
def load(self):
"""
Try to load config from (in order):
1. ./qlpycon.conf (current directory)
2. ~/.qlpycon.conf (home directory)
"""
config_paths = [
'qlpycon.conf',
os.path.expanduser('~/.qlpycon.conf')
]
for path in config_paths:
if os.path.exists(path):
try:
self.config.read(path)
self.config_loaded = True
logger.info(f'Loaded configuration from: {path}')
return True
except Exception as e:
logger.warning(f'Failed to load config from {path}: {e}')
logger.debug('No configuration file found, using defaults')
return False
def get(self, section, key, fallback=None):
"""Get a configuration value"""
if not self.config_loaded:
return fallback
try:
return self.config.get(section, key, fallback=fallback)
except (configparser.NoSectionError, configparser.NoOptionError):
return fallback
def get_int(self, section, key, fallback=0):
"""Get an integer configuration value"""
value = self.get(section, key)
if value is None:
return fallback
try:
return int(value)
except ValueError:
logger.warning(f'Invalid integer value for [{section}] {key}: {value}')
return fallback
def get_bool(self, section, key, fallback=False):
"""Get a boolean configuration value"""
value = self.get(section, key)
if value is None:
return fallback
return value.lower() in ('true', 'yes', '1', 'on')
def get_host(self):
"""Get connection host"""
return self.get('connection', 'host')
def get_password(self):
"""Get connection password (supports ${ENV_VAR} syntax)"""
password = self.get('connection', 'password')
if not password:
return None
# Support environment variable substitution: ${VAR_NAME}
if password.startswith('${') and password.endswith('}'):
env_var = password[2:-1]
return os.environ.get(env_var)
return password
def get_log_level(self):
"""Get logging level"""
level_str = self.get('logging', 'level', 'INFO')
levels = {
'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL
}
return levels.get(level_str.upper(), logging.INFO)
def create_example_config():
"""Create an example configuration file"""
config_content = """# QLPyCon Configuration File
# Place this file as ~/.qlpycon.conf or ./qlpycon.conf
[connection]
# Server connection settings
host = tcp://10.13.12.93:28969
# Use ${ENV_VAR} to read from environment
password = ${QLPYCON_PASSWORD}
[logging]
# Log level: DEBUG, INFO, WARNING, ERROR, CRITICAL
level = INFO
[ui]
# Max command history entries
max_history = 10
# Color scheme (future feature)
color_scheme = quake
[behavior]
# Quit confirmation timeout (seconds)
quit_timeout = 3.0
# Player respawn delay (seconds)
respawn_delay = 3.0
"""
example_path = os.path.expanduser('~/.qlpycon.conf.example')
try:
with open(example_path, 'w') as f:
f.write(config_content)
print(f'Created example config: {example_path}')
print(f'Copy to ~/.qlpycon.conf and edit as needed')
return True
except Exception as e:
print(f'Failed to create example config: {e}')
return False
if __name__ == '__main__':
# Create example config when run directly
create_example_config()

149
state.py
View File

@ -13,7 +13,7 @@ logger = logging.getLogger('state')
class ServerInfo:
"""Tracks current server information"""
def __init__(self):
self.hostname = 'Unknown'
self.map = 'Unknown'
@ -28,13 +28,13 @@ class ServerInfo:
self.red_rounds = 0
self.blue_score = 0
self.blue_rounds = 0
self.players = []
self.players = {} # Changed to dict: {name: {'score': str, 'ping': str}}
self.last_update = 0
self.warmup = False
self.dead_players = {}
self.round_end_time = None
self.match_time = 0
def is_team_mode(self):
"""Check if current gametype is a team mode"""
return self.gametype in TEAM_MODES
@ -44,12 +44,12 @@ class ServerInfo:
self.red_rounds = 0
self.blue_rounds = 0
self.dead_players.clear()
def update_from_cvar(self, cvar_name, value):
"""Update server info from a cvar response"""
# Normalize cvar name to lowercase for case-insensitive matching
cvar_lower = cvar_name.lower()
mapping = {
'qlx_serverbrandname': 'hostname',
'g_factorytitle': 'gametype',
@ -60,7 +60,7 @@ class ServerInfo:
'capturelimit': 'capturelimit',
'sv_maxclients': 'maxclients'
}
attr = mapping.get(cvar_lower)
if attr:
# Only strip color codes for non-hostname fields
@ -74,53 +74,46 @@ class ServerInfo:
class PlayerTracker:
"""Tracks player teams and information"""
def __init__(self, server_info):
self.server_info = server_info
self.player_teams = {}
def update_team(self, name, team):
"""Update player team. Team can be int or string"""
# Convert numeric team to string
if isinstance(team, int):
team = TEAM_MAP.get(team, 'FREE')
if team not in ['RED', 'BLUE', 'FREE', 'SPECTATOR']:
team = 'FREE'
# Store both original name and color-stripped version
self.player_teams[name] = team
clean_name = re.sub(r'\^\d', '', name)
if clean_name != name:
self.player_teams[clean_name] = team
logger.debug(f'Updated team for {name} (clean: {clean_name}): {team}')
def get_team(self, name):
"""Get player's team"""
return self.player_teams.get(name)
def add_player(self, name, score='0', ping='0'):
"""Add player to server's player list if not exists"""
clean_name = re.sub(r'\^\d', '', name)
# Check if player already exists (by either name or clean name)
for existing in self.server_info.players:
existing_clean = re.sub(r'\^\d', '', existing['name'])
if existing['name'] == name or existing_clean == clean_name:
return # Already exists
self.server_info.players.append({
'name': name,
'score': score,
'ping': ping
})
"""Add player to server's player dict if not exists"""
# Use original name with color codes as key
if name not in self.server_info.players:
self.server_info.players[name] = {
'score': score,
'ping': ping
}
logger.debug(f'Added player: {name}')
def get_players_by_team(self):
"""Get players organized by team"""
teams = {'RED': [], 'BLUE': [], 'SPECTATOR': [], 'FREE': []}
for player in self.server_info.players:
name = player['name']
for name in self.server_info.players.keys():
team = self.player_teams.get(name, 'FREE')
if team not in teams:
team = 'FREE'
@ -130,91 +123,105 @@ class PlayerTracker:
def remove_player(self, name):
"""Remove player from tracking"""
clean_name = re.sub(r'\^\d', '', name)
# Count before removal
before_count = len(self.server_info.players)
# Remove from player list - check both original and clean names
self.server_info.players = [
p for p in self.server_info.players
if p['name'] != name and re.sub(r'\^\d', '', p['name']) != clean_name
]
# Log if anything was actually removed
after_count = len(self.server_info.players)
if before_count != after_count:
logger.info(f'Removed player: {name} (clean: {clean_name}) - {before_count} -> {after_count}')
# Try to remove by exact name first
removed = self.server_info.players.pop(name, None)
# If not found, try to find by clean name
if not removed:
for player_name in list(self.server_info.players.keys()):
if re.sub(r'\^\d', '', player_name) == clean_name:
removed = self.server_info.players.pop(player_name)
logger.info(f'Removed player: {player_name} (matched clean name: {clean_name})')
break
else:
logger.info(f'Removed player: {name}')
if not removed:
logger.warning(f'Player not found for removal: {name} (clean: {clean_name})')
# Remove from team tracking - both versions
# Remove from team tracking
self.player_teams.pop(name, None)
self.player_teams.pop(clean_name, None)
def rename_player(self, old_name, new_name):
"""Rename a player while maintaining their team"""
"""Rename a player while maintaining their team and score"""
old_clean = re.sub(r'\^\d', '', old_name)
# Get current team (try both names)
team = self.player_teams.get(old_name) or self.player_teams.get(old_clean, 'SPECTATOR')
# Find and update player in server list
for player in self.server_info.players:
if player['name'] == old_name or re.sub(r'\^\d', '', player['name']) == old_clean:
player['name'] = new_name
break
# Find player data by old name
player_data = self.server_info.players.pop(old_name, None)
# If not found by exact name, try clean name
if not player_data:
for player_name in list(self.server_info.players.keys()):
if re.sub(r'\^\d', '', player_name) == old_clean:
player_data = self.server_info.players.pop(player_name)
break
# Add player with new name
if player_data:
self.server_info.players[new_name] = player_data
# Remove old team entries
self.player_teams.pop(old_name, None)
self.player_teams.pop(old_clean, None)
# Add new team entries with color codes preserved
self.update_team(new_name, team)
logger.debug(f'Renamed player: {old_name} -> {new_name} (team: {team})')
def update_score(self, name, delta):
"""Update player's score by delta (+1 for kill, -1 for death/suicide)"""
# Try exact name first (O(1) lookup)
if name in self.server_info.players:
current_score = int(self.server_info.players[name].get('score', 0))
self.server_info.players[name]['score'] = str(current_score + delta)
logger.debug(f"Score update: {name} {delta:+d} -> {self.server_info.players[name]['score']}")
return
# Fallback: search by clean name (rare case)
clean_name = re.sub(r'\^\d', '', name)
for player in self.server_info.players:
player_clean = re.sub(r'\^\d', '', player['name'])
if player['name'] == name or player_clean == clean_name:
current_score = int(player.get('score', 0))
player['score'] = str(current_score + delta)
logger.debug(f"Score update: {name} {delta:+d} -> {player['score']}")
for player_name, player_data in self.server_info.players.items():
if re.sub(r'\^\d', '', player_name) == clean_name:
current_score = int(player_data.get('score', 0))
player_data['score'] = str(current_score + delta)
logger.debug(f"Score update: {player_name} {delta:+d} -> {player_data['score']}")
return
logger.warning(f"Could not update score for {name} - player not found")
class EventDeduplicator:
"""Prevents duplicate kill/death events"""
def __init__(self):
self.recent_events = []
def is_duplicate(self, event_type, time_val, killer_name, victim_name):
"""Check if this kill/death event is a duplicate"""
if event_type not in ('PLAYER_DEATH', 'PLAYER_KILL'):
return False
signature = f"KILL:{time_val}:{killer_name}:{victim_name}"
if signature in self.recent_events:
logger.debug(f'Duplicate event: {signature}')
return True
# Add to recent events
self.recent_events.append(signature)
if len(self.recent_events) > MAX_RECENT_EVENTS:
self.recent_events.pop(0)
return False
class GameState:
"""Main game state container"""
def __init__(self):
self.server_info = ServerInfo()
self.player_tracker = PlayerTracker(self.server_info)

386
ui.py
View File

@ -9,18 +9,19 @@ import curses.textpad
import threading
import queue
import logging
from config import COLOR_PAIRS, INFO_WINDOW_HEIGHT, INFO_WINDOW_Y, OUTPUT_WINDOW_Y, INPUT_WINDOW_HEIGHT, TEAM_MODES
from config import COLOR_PAIRS, INFO_WINDOW_HEIGHT, INFO_WINDOW_Y, OUTPUT_WINDOW_Y, INPUT_WINDOW_HEIGHT, TEAM_MODES, MAX_COMMAND_HISTORY
from cvars import autocomplete, COMMAND_SIGNATURES, get_signature_with_highlight, get_argument_suggestions, COMMAND_ARGUMENTS
logger = logging.getLogger('ui')
class CursesHandler(logging.Handler):
"""Logging handler that outputs to curses window"""
def __init__(self, window):
logging.Handler.__init__(self)
self.window = window
def emit(self, record):
try:
msg = self.format(record)
@ -35,7 +36,7 @@ class CursesHandler(logging.Handler):
curses.doupdate()
except (KeyboardInterrupt, SystemExit):
raise
except:
except Exception:
self.handleError(record)
def print_colored(window, message, attributes=0):
@ -46,12 +47,12 @@ def print_colored(window, message, attributes=0):
if not curses.has_colors:
window.addstr(message)
return
color = 0
bold = False
underline = False
parse_color = False
for ch in message:
val = ord(ch)
if parse_color:
@ -74,10 +75,120 @@ def print_colored(window, message, attributes=0):
parse_color = True
else:
window.addch(ch, curses.color_pair(color) | (curses.A_BOLD if bold else 0) | (curses.A_UNDERLINE if underline else 0) | attributes)
def update_autocomplete_display(window, current_input, first_word, words, ends_with_space):
"""
Update autocomplete display based on current input state.
Returns (suggestions, suggestion_index, original_word) tuple for Tab cycling.
Handles three display modes:
1. Command autocomplete (typing partial command)
2. Signature display (command recognized, showing arguments)
3. Argument value suggestions (typing argument values)
"""
suggestions = []
suggestion_index = -1
original_word = ""
# Check if this is a command with argument definitions
if first_word in COMMAND_ARGUMENTS:
# Determine if user is typing arguments (not just the command)
if len(words) == 1 and not ends_with_space:
# Just command, no space yet → show signature with first arg highlighted
sig_parts = get_signature_with_highlight(first_word, 0)
if sig_parts:
x_pos = 0
for arg_text, is_current in sig_parts:
try:
if is_current:
window.addstr(1, x_pos, arg_text, curses.A_REVERSE)
else:
window.addstr(1, x_pos, arg_text, curses.A_DIM)
x_pos += len(arg_text) + 1
except:
pass
else:
# User is typing arguments
if ends_with_space:
# Starting new argument (empty so far)
arg_position = len(words) - 1 # -1 for command
current_value = ''
else:
# Typing current argument
arg_position = len(words) - 2 # -1 for command, -1 for 0-indexed
current_value = words[-1]
# Get argument suggestions
arg_suggestions = get_argument_suggestions(
first_word,
arg_position,
current_value,
player_list=None # TODO: pass player list from game_state
)
if arg_suggestions:
# Show argument value suggestions with label (limit to 10 for performance)
arg_type = COMMAND_ARGUMENTS[first_word][arg_position]['type']
display_suggestions = arg_suggestions[:10]
more_indicator = f' (+{len(arg_suggestions)-10} more)' if len(arg_suggestions) > 10 else ''
match_line = f'<{arg_type}>: {" ".join(display_suggestions)}{more_indicator}'
try:
window.addstr(1, 0, match_line, curses.A_DIM)
except:
pass
suggestions = arg_suggestions # Store for Tab cycling
suggestion_index = -1
original_word = current_value
else:
# No suggestions (freetext, player without list, etc.) → show signature
sig_parts = get_signature_with_highlight(first_word, arg_position)
if sig_parts:
x_pos = 0
for arg_text, is_current in sig_parts:
try:
if is_current:
window.addstr(1, x_pos, arg_text, curses.A_REVERSE)
else:
window.addstr(1, x_pos, arg_text, curses.A_DIM)
x_pos += len(arg_text) + 1
except:
pass
elif first_word in COMMAND_SIGNATURES and COMMAND_SIGNATURES[first_word]:
# Command with signature but no argument definitions
sig_parts = get_signature_with_highlight(first_word, 0)
if sig_parts:
x_pos = 0
for arg_text, is_current in sig_parts:
try:
if is_current:
window.addstr(1, x_pos, arg_text, curses.A_REVERSE)
else:
window.addstr(1, x_pos, arg_text, curses.A_DIM)
x_pos += len(arg_text) + 1
except:
pass
else:
# Not a recognized command → show command autocomplete
current_word = words[-1]
if len(current_word) >= 2:
suggestions = autocomplete(current_word, max_results=5)
suggestion_index = -1
original_word = current_word
if suggestions:
match_line = ' '.join(suggestions)
try:
window.addstr(1, 0, match_line, curses.A_DIM)
except:
pass
return suggestions, suggestion_index, original_word
class UIManager:
"""Manages curses windows and display"""
def __init__(self, screen, host):
self.screen = screen
self.host = host
@ -88,7 +199,7 @@ class UIManager:
self.input_queue = None
self.command_history = []
self.history_index = -1
self._init_curses()
self._create_windows()
@ -100,23 +211,23 @@ class UIManager:
curses.start_color()
curses.use_default_colors()
curses.cbreak()
curses.curs_set(0)
curses.curs_set(1) # Show cursor in input window
self.screen.addstr(f"Quake Live PyCon: {self.host}")
self.screen.noutrefresh()
# Initialize color pairs
for i in range(1, 7):
curses.init_pair(i, i, 0)
# Swap cyan and magenta (5 and 6)
curses.init_pair(5, 6, 0)
curses.init_pair(6, 5, 0)
def _create_windows(self):
"""Create all UI windows"""
maxy, maxx = self.screen.getmaxyx()
# Server info window (top)
self.info_window = curses.newwin(
INFO_WINDOW_HEIGHT,
@ -128,7 +239,7 @@ class UIManager:
self.info_window.idlok(False)
self.info_window.leaveok(True)
self.info_window.noutrefresh()
# Output window (middle - main display)
self.output_window = curses.newwin(
maxy - 17,
@ -154,7 +265,7 @@ class UIManager:
self.divider_window.idlok(False)
self.divider_window.leaveok(True)
self.divider_window.noutrefresh()
# Input window (bottom)
self.input_window = curses.newwin(
INPUT_WINDOW_HEIGHT,
@ -169,12 +280,12 @@ class UIManager:
self.input_window.idcok(True)
self.input_window.leaveok(False)
self.input_window.noutrefresh()
self.screen.noutrefresh()
curses.doupdate()
def setup_input_queue(self):
"""Setup threaded input queue with command history"""
"""Setup threaded input queue with command history and autocomplete"""
def wait_stdin(q, window, manager):
current_input = ""
cursor_pos = 0
@ -182,6 +293,11 @@ class UIManager:
temp_input = "" # Temp storage when navigating history
quit_confirm = False
# Autocomplete state
suggestions = []
suggestion_index = -1
original_word = "" # Store original word before cycling
while True:
try:
key = window.getch()
@ -189,12 +305,82 @@ class UIManager:
if key == -1: # No input
continue
# Tab key - cycle through suggestions
if key == ord('\t') or key == 9:
if suggestions:
# Cycle to next suggestion
suggestion_index = (suggestion_index + 1) % len(suggestions)
# Replace or append suggestion
words = current_input.split()
if words:
# If original_word is empty, we had trailing space - append new word
# Otherwise, replace current word
if original_word == '':
words.append(suggestions[suggestion_index])
# Update original_word so next Tab replaces instead of appending
original_word = suggestions[suggestion_index]
else:
words[-1] = suggestions[suggestion_index]
current_input = ' '.join(words)
cursor_pos = len(current_input)
# Update display
window.erase()
window.addstr(0, 0, current_input)
# Determine display format
first_word = words[0].lower()
selected_value = suggestions[suggestion_index]
# Check if we're cycling argument values or commands
if first_word in COMMAND_ARGUMENTS and len(words) > 1:
# Cycling argument values - show with label
ends_with_space = current_input.endswith(' ')
if ends_with_space:
arg_position = len(words) - 1
else:
arg_position = len(words) - 2
# Bounds check to prevent index out of range
if arg_position < len(COMMAND_ARGUMENTS[first_word]):
arg_type = COMMAND_ARGUMENTS[first_word][arg_position]['type']
# Show only first 10 suggestions for performance
display_suggestions = suggestions[:10]
more_indicator = f' (+{len(suggestions)-10} more)' if len(suggestions) > 10 else ''
display_line = f'<{arg_type}>: {" ".join(display_suggestions)}{more_indicator}'
else:
# Fallback if position out of bounds
display_line = ' '.join(suggestions[:10])
else:
# Cycling commands - show with signature if available
display_suggestions = suggestions[:10]
match_line = ' '.join(display_suggestions)
if selected_value in COMMAND_SIGNATURES:
signature = COMMAND_SIGNATURES[selected_value]
if signature:
display_line = f"{match_line}{signature}"
else:
display_line = match_line
else:
display_line = match_line
try:
window.addstr(1, 0, display_line, curses.A_DIM)
except:
pass
window.move(0, cursor_pos)
window.noutrefresh()
curses.doupdate() # Actually push the refresh to screen
continue
# Enter key
if key in (curses.KEY_ENTER, 10, 13):
if len(current_input) > 0:
# Add to history
manager.command_history.append(current_input)
if len(manager.command_history) > 10:
if len(manager.command_history) > MAX_COMMAND_HISTORY:
manager.command_history.pop(0)
q.put(current_input)
@ -202,6 +388,9 @@ class UIManager:
cursor_pos = 0
temp_history_index = -1
temp_input = ""
suggestions = []
suggestion_index = -1
original_word = ""
window.erase()
window.noutrefresh()
@ -217,6 +406,9 @@ class UIManager:
temp_history_index -= 1
current_input = manager.command_history[temp_history_index]
cursor_pos = len(current_input)
suggestions = []
suggestion_index = -1
original_word = ""
window.erase()
window.addstr(0, 0, current_input)
window.noutrefresh()
@ -234,6 +426,9 @@ class UIManager:
current_input = manager.command_history[temp_history_index]
cursor_pos = len(current_input)
suggestions = []
suggestion_index = -1
original_word = ""
window.erase()
window.addstr(0, 0, current_input)
window.noutrefresh()
@ -244,7 +439,7 @@ class UIManager:
cursor_pos -= 1
window.move(0, cursor_pos)
window.noutrefresh()
# Arrow RIGHT - move cursor right
elif key == curses.KEY_RIGHT:
if cursor_pos < len(current_input):
@ -258,10 +453,23 @@ class UIManager:
current_input = current_input[:cursor_pos-1] + current_input[cursor_pos:]
cursor_pos -= 1
temp_history_index = -1 # Exit history mode
window.erase()
window.addstr(0, 0, current_input)
# Parse input and update autocomplete display
words = current_input.split()
ends_with_space = current_input.endswith(' ')
if words:
first_word = words[0].lower()
suggestions, suggestion_index, original_word = update_autocomplete_display(
window, current_input, first_word, words, ends_with_space
)
window.move(0, cursor_pos)
window.noutrefresh()
curses.doupdate() # Immediate screen update
# Regular character
elif 32 <= key <= 126:
@ -269,14 +477,28 @@ class UIManager:
current_input = current_input[:cursor_pos] + char + current_input[cursor_pos:]
cursor_pos += 1
temp_history_index = -1 # Exit history mode
window.erase()
window.addstr(0, 0, current_input)
# Parse input and update autocomplete display
words = current_input.split()
ends_with_space = current_input.endswith(' ')
if words:
first_word = words[0].lower()
suggestions, suggestion_index, original_word = update_autocomplete_display(
window, current_input, first_word, words, ends_with_space
)
window.move(0, cursor_pos)
window.noutrefresh()
curses.doupdate() # Immediate screen update
except Exception as e:
import logging
logging.getLogger('ui').error(f'Input error: {e}')
# Log but continue - input thread should stay alive
window.move(0, cursor_pos)
curses.doupdate()
@ -287,14 +509,14 @@ class UIManager:
t.start()
return self.input_queue
def setup_logging(self):
"""Setup logging handler for output window"""
handler = CursesHandler(self.output_window)
formatter = logging.Formatter('%(asctime)-8s|%(name)-12s|%(levelname)-6s|%(message)-s', '%H:%M:%S')
handler.setFormatter(formatter)
return handler
def print_message(self, message, attributes=0):
"""Print formatted message to output window"""
print_colored(self.output_window, message, attributes)
@ -305,16 +527,15 @@ class UIManager:
def update_server_info(self, game_state):
"""Update server info window"""
self.info_window.erase()
max_y, max_x = self.info_window.getmaxyx()
server_info = game_state.server_info
# Line 1: Hostname with Timer and Warmup Indicator
hostname = server_info.hostname
timer_display = ""
if not server_info.warmup:
#if server_info.match_time > 0 and not server_info.warmup:
if server_info.match_time > 0 and not server_info.warmup:
mins = server_info.match_time // 60
secs = server_info.match_time % 60
timer_display = f"^3^0Time:^8^7 {mins}:{secs:02d}^0"
@ -324,7 +545,7 @@ class UIManager:
warmup_display = "^3^0Warmup:^8 ^2YES^0" if server_info.warmup else "^3^0Warmup: ^8^1NO^0"
print_colored(self.info_window, f"^3Name:^8 {hostname} {warmup_display} {timer_display}\n", 0)
# Line 2: Game info
gametype = server_info.gametype
mapname = server_info.map
@ -346,17 +567,17 @@ class UIManager:
limit_display = f"^3^0| Timelimit:^7^8 {timelimit}"
else:
limit_display = f"^3^0| Timelimit:^7^8 {timelimit} ^0^3| Fraglimit:^7^8 {fraglimit}"
print_colored(self.info_window,
f"^3^0Type:^7^8 {gametype} ^0^3| Map:^7^8 {mapname} ^0^3| Players:^7^8 {curclients}/{maxclients} "
f"{limit_display}^0\n", 0)
# Blank lines to fill
self.info_window.addstr("\n")
# Line 3: Team headers and player lists
teams = game_state.player_tracker.get_players_by_team()
if server_info.gametype in TEAM_MODES:
if server_info.gametype == 'Clan Arena':
red_score = f"{server_info.red_rounds:>3} "
@ -365,82 +586,63 @@ class UIManager:
else:
red_total = 0
blue_total = 0
for player in server_info.players:
player_name = player['name']
for player_name, player_data in server_info.players.items():
team = game_state.player_tracker.get_team(player_name)
score = int(player.get('score', 0))
score = int(player_data.get('score', 0))
if team == 'RED':
red_total += score
elif team == 'BLUE':
blue_total += score
red_score = f"{red_total:>3} "
blue_score = f"{blue_total:>3} "
print_colored(self.info_window, f"^8^7{red_score}^9^1RED TEAM^0 ^7^8{blue_score}^9^4BLUE TEAM\n", 0)
# Sort players by score within each team
red_players_with_scores = []
blue_players_with_scores = []
spec_players = []
for player_name in teams['RED']:
score = 0
for player in server_info.players:
if player['name'] == player_name:
score = int(player.get('score', 0))
break
score = int(server_info.players.get(player_name, {}).get('score', 0))
red_players_with_scores.append((player_name, score))
for player_name in teams['BLUE']:
score = 0
for player in server_info.players:
if player['name'] == player_name:
score = int(player.get('score', 0))
break
score = int(server_info.players.get(player_name, {}).get('score', 0))
blue_players_with_scores.append((player_name, score))
# Sort by score descending
red_players_with_scores.sort(key=lambda x: x[1], reverse=True)
blue_players_with_scores.sort(key=lambda x: x[1], reverse=True)
red_players = [name for name, score in red_players_with_scores[:4]]
blue_players = [name for name, score in blue_players_with_scores[:4]]
spec_players = teams['SPECTATOR'][:4]
for i in range(4):
red_name = red_players[i] if i < len(red_players) else ''
blue_name = blue_players[i] if i < len(blue_players) else ''
# Get scores for team players
red_score = ''
blue_score = ''
if red_name:
for player in server_info.players:
if player['name'] == red_name:
red_score = player.get('score', '0')
break
if blue_name:
for player in server_info.players:
if player['name'] == blue_name:
blue_score = player.get('score', '0')
break
red_score = server_info.players.get(red_name, {}).get('score', '0') if red_name else ''
blue_score = server_info.players.get(blue_name, {}).get('score', '0') if blue_name else ''
# Check if players are dead
red_dead = red_name in server_info.dead_players
blue_dead = blue_name in server_info.dead_players
# Format with strikethrough for dead players (using dim text)
red = f"{red_score:>3} {'^8^1X^7^0 ' if red_dead else ''}{red_name}" if red_name else ''
blue = f"{blue_score:>3} {'^8^1X^7^0 ' if blue_dead else ''}{blue_name}" if blue_name else ''
from formatter import strip_color_codes
red_clean = strip_color_codes(red)
blue_clean = strip_color_codes(blue)
red_pad = 24 - len(red_clean)
line = f"^8{red}^0{' ' * red_pad}^8{blue}^0\n"
print_colored(self.info_window, line, 0)
else:
@ -449,43 +651,29 @@ class UIManager:
free_players = teams['FREE']
free_players_with_scores = []
for player_name in free_players:
score = 0
for player in server_info.players:
if player['name'] == player_name:
score = int(player.get('score', 0))
break
score = int(server_info.players.get(player_name, {}).get('score', 0))
free_players_with_scores.append((player_name, score))
# Sort by score descending
free_players_with_scores.sort(key=lambda x: x[1], reverse=True)
sorted_free_players = [name for name, score in free_players_with_scores]
spec_players = teams['SPECTATOR'][:4]
free_col1 = sorted_free_players[:4]
free_col2 = sorted_free_players[4:8]
for i in range(4):
col1_name = free_col1[i] if i < len(free_col1) else ''
col2_name = free_col2[i] if i < len(free_col2) else ''
# Get scores for FREE players
col1_score = ''
col2_score = ''
if col1_name:
for player in server_info.players:
if player['name'] == col1_name:
col1_score = player.get('score', '0')
break
if col2_name:
for player in server_info.players:
if player['name'] == col2_name:
col2_score = player.get('score', '0')
break
col1_score = server_info.players.get(col1_name, {}).get('score', '0') if col1_name else ''
col2_score = server_info.players.get(col2_name, {}).get('score', '0') if col2_name else ''
# Check if players are dead
col1_dead = col1_name in server_info.dead_players
col2_dead = col2_name in server_info.dead_players
# Format: " 9 PlayerName" with right-aligned score and dead marker
col1 = f"{col1_score:>3} {'^8^1X^7^0 ' if col1_dead else ''}{col1_name}" if col1_name else ''
col2 = f"{col2_score:>3} {'^8^1X^7^0 ' if col2_dead else ''}{col2_name}" if col2_name else ''
@ -493,12 +681,12 @@ class UIManager:
from formatter import strip_color_codes
col1_clean = strip_color_codes(col1)
col2_clean = strip_color_codes(col2)
col1_pad = 24 - len(col1_clean)
line = f"^8{col1}^0{' ' * col1_pad}^8{col2}^0\n"
print_colored(self.info_window, line, 0)
# Blank lines to fill
self.info_window.addstr("\n")
@ -506,14 +694,14 @@ class UIManager:
spec_list = " ".join(spec_players)
line = f"^8^3Spectators:^7 {spec_list}\n"
print_colored(self.info_window, line, 0)
# Blank lines to fill
self.info_window.addstr("\n")
# Separator
separator = "^7" + "" * (max_x - 1) + "^7"
print_colored(self.info_window, separator, 0)
self.info_window.noutrefresh()
curses.doupdate()