#!/usr/bin/env python3 """ Curses-based UI for QLPyCon Handles terminal display, windows, and color rendering """ import curses import curses.textpad import threading import queue import logging import time from config import COLOR_PAIRS, INFO_WINDOW_HEIGHT, INFO_WINDOW_Y, OUTPUT_WINDOW_Y, INPUT_WINDOW_HEIGHT, TEAM_MODES, MAX_COMMAND_HISTORY from cvars import autocomplete, COMMAND_SIGNATURES, get_signature_with_highlight, get_argument_suggestions, COMMAND_ARGUMENTS logger = logging.getLogger('ui') class CursesHandler(logging.Handler): """Logging handler that outputs to curses window""" def __init__(self, window): logging.Handler.__init__(self) self.window = window def emit(self, record): try: msg = self.format(record) fs = "%s\n" try: print_colored(self.window, fs % msg, 0) self.window.noutrefresh() curses.doupdate() except UnicodeError: print_colored(self.window, fs % msg.encode("UTF-8"), 0) self.window.noutrefresh() curses.doupdate() except (KeyboardInterrupt, SystemExit): raise except Exception: self.handleError(record) def print_colored(window, message, attributes=0): """ Print message with Quake color codes (^N) ^0 = reset, ^1 = red, ^2 = green, ^3 = yellow, ^4 = blue, ^5 = cyan, ^6 = magenta, ^7 = white, ^8 = bold, ^9 = underline """ if not curses.has_colors: window.addstr(message) return color = 0 bold = False underline = False parse_color = False for ch in message: val = ord(ch) if parse_color: if ch == '8': bold = True elif ch == '9': underline = True elif ch == '0': bold = False underline = False elif ch == '7': color = 0 elif ord('1') <= val <= ord('6'): color = val - ord('0') else: window.addch('^', curses.color_pair(color) | (curses.A_BOLD if bold else 0) | (curses.A_UNDERLINE if underline else 0) | attributes) window.addch(ch, curses.color_pair(color) | (curses.A_BOLD if bold else 0) | (curses.A_UNDERLINE if underline else 0) | attributes) parse_color = False elif ch == '^': parse_color = True else: window.addch(ch, curses.color_pair(color) | (curses.A_BOLD if bold else 0) | (curses.A_UNDERLINE if underline else 0) | attributes) def update_autocomplete_display(window, current_input, first_word, words, ends_with_space): """ Update autocomplete display based on current input state. Returns (suggestions, suggestion_index, original_word) tuple for Tab cycling. Handles three display modes: 1. Command autocomplete (typing partial command) 2. Signature display (command recognized, showing arguments) 3. Argument value suggestions (typing argument values) """ suggestions = [] suggestion_index = -1 original_word = "" # Check if this is a command with argument definitions if first_word in COMMAND_ARGUMENTS: # Determine if user is typing arguments (not just the command) if len(words) == 1 and not ends_with_space: # Just command, no space yet → show signature with first arg highlighted sig_parts = get_signature_with_highlight(first_word, 0) if sig_parts: x_pos = 0 for arg_text, is_current in sig_parts: try: if is_current: window.addstr(1, x_pos, arg_text, curses.A_REVERSE) else: window.addstr(1, x_pos, arg_text, curses.A_DIM) x_pos += len(arg_text) + 1 except curses.error: pass else: # User is typing arguments if ends_with_space: # Starting new argument (empty so far) arg_position = len(words) - 1 # -1 for command current_value = '' else: # Typing current argument arg_position = len(words) - 2 # -1 for command, -1 for 0-indexed current_value = words[-1] # Get argument suggestions arg_suggestions = get_argument_suggestions( first_word, arg_position, current_value, player_list=None # TODO: pass player list from game_state ) if arg_suggestions: # Show argument value suggestions with label (limit to 10 for performance) arg_type = COMMAND_ARGUMENTS[first_word][arg_position]['type'] display_suggestions = arg_suggestions[:10] more_indicator = f' (+{len(arg_suggestions)-10} more)' if len(arg_suggestions) > 10 else '' match_line = f'<{arg_type}>: {" ".join(display_suggestions)}{more_indicator}' try: window.addstr(1, 0, match_line, curses.A_DIM) except curses.error: pass suggestions = arg_suggestions # Store for Tab cycling suggestion_index = -1 original_word = current_value else: # No suggestions (freetext, player without list, etc.) → show signature sig_parts = get_signature_with_highlight(first_word, arg_position) if sig_parts: x_pos = 0 for arg_text, is_current in sig_parts: try: if is_current: window.addstr(1, x_pos, arg_text, curses.A_REVERSE) else: window.addstr(1, x_pos, arg_text, curses.A_DIM) x_pos += len(arg_text) + 1 except curses.error: pass elif first_word in COMMAND_SIGNATURES and COMMAND_SIGNATURES[first_word]: # Command with signature but no argument definitions sig_parts = get_signature_with_highlight(first_word, 0) if sig_parts: x_pos = 0 for arg_text, is_current in sig_parts: try: if is_current: window.addstr(1, x_pos, arg_text, curses.A_REVERSE) else: window.addstr(1, x_pos, arg_text, curses.A_DIM) x_pos += len(arg_text) + 1 except curses.error: pass else: # Not a recognized command → show command autocomplete current_word = words[-1] if len(current_word) >= 2: suggestions = autocomplete(current_word, max_results=5) suggestion_index = -1 original_word = current_word if suggestions: match_line = ' '.join(suggestions) try: window.addstr(1, 0, match_line, curses.A_DIM) except curses.error: pass return suggestions, suggestion_index, original_word class UIManager: """Manages curses windows and display""" def __init__(self, screen, host): self.screen = screen self.host = host self.info_window = None self.output_window = None self.input_window = None self.divider_window = None self.input_queue = None self.command_history = [] self.history_index = -1 self.cursor_pos = 0 # Track cursor position in input self._init_curses() self._create_windows() def _init_curses(self): """Initialize curses settings""" curses.endwin() curses.initscr() self.screen.nodelay(1) curses.start_color() curses.use_default_colors() curses.cbreak() curses.curs_set(1) # Show cursor in input window self.screen.addstr(f"Quake Live PyCon: {self.host}") self.screen.noutrefresh() # Initialize color pairs for i in range(1, 7): curses.init_pair(i, i, 0) # Swap cyan and magenta (5 and 6) curses.init_pair(5, 6, 0) curses.init_pair(6, 5, 0) def _create_windows(self): """Create all UI windows""" maxy, maxx = self.screen.getmaxyx() # Server info window (top) self.info_window = curses.newwin( INFO_WINDOW_HEIGHT, maxx - 4, INFO_WINDOW_Y, 2 ) self.info_window.scrollok(False) self.info_window.idlok(False) self.info_window.leaveok(True) self.info_window.noutrefresh() # Output window (middle - main display) self.output_window = curses.newwin( maxy - 17, maxx - 4, OUTPUT_WINDOW_Y, 2 ) self.output_window.scrollok(True) self.output_window.idlok(False) self.output_window.idcok(False) self.output_window.leaveok(True) self.output_window.noutrefresh() # Divider line self.divider_window = curses.newwin( 1, maxx - 4, maxy - 3, 2 ) self.divider_window.hline(curses.ACS_HLINE, maxx - 4) self.divider_window.scrollok(False) self.divider_window.idlok(False) self.divider_window.leaveok(True) self.divider_window.noutrefresh() # Input window (bottom) self.input_window = curses.newwin( INPUT_WINDOW_HEIGHT, maxx - 6, maxy - 2, 4 ) self.input_window.keypad(True) self.input_window.nodelay(False) self.screen.addstr(maxy - 2, 2, '$ ') self.input_window.idlok(True) self.input_window.idcok(True) self.input_window.leaveok(False) self.input_window.noutrefresh() self.screen.noutrefresh() curses.doupdate() def setup_input_queue(self): """Setup threaded input queue with command history and autocomplete""" def wait_stdin(q, window, manager): current_input = "" cursor_pos = 0 manager.cursor_pos = 0 # Keep manager in sync temp_history_index = -1 temp_input = "" # Temp storage when navigating history quit_confirm = False # Autocomplete state suggestions = [] suggestion_index = -1 original_word = "" # Store original word before cycling while True: try: key = window.getch() if key == -1: # No input continue # Tab key - cycle through suggestions if key == ord('\t') or key == 9: if suggestions: # Cycle to next suggestion suggestion_index = (suggestion_index + 1) % len(suggestions) # Replace or append suggestion words = current_input.split() if words: # If original_word is empty, we had trailing space - append new word # Otherwise, replace current word if original_word == '': words.append(suggestions[suggestion_index]) # Update original_word so next Tab replaces instead of appending original_word = suggestions[suggestion_index] else: words[-1] = suggestions[suggestion_index] current_input = ' '.join(words) cursor_pos = len(current_input) manager.cursor_pos = cursor_pos # Update display window.erase() window.addstr(0, 0, current_input) # Determine display format first_word = words[0].lower() selected_value = suggestions[suggestion_index] # Check if we're cycling argument values or commands if first_word in COMMAND_ARGUMENTS and len(words) > 1: # Cycling argument values - show with label ends_with_space = current_input.endswith(' ') if ends_with_space: arg_position = len(words) - 1 else: arg_position = len(words) - 2 # Bounds check to prevent index out of range if arg_position < len(COMMAND_ARGUMENTS[first_word]): arg_type = COMMAND_ARGUMENTS[first_word][arg_position]['type'] # Show only first 10 suggestions for performance display_suggestions = suggestions[:10] more_indicator = f' (+{len(suggestions)-10} more)' if len(suggestions) > 10 else '' display_line = f'<{arg_type}>: {" ".join(display_suggestions)}{more_indicator}' else: # Fallback if position out of bounds display_line = ' '.join(suggestions[:10]) else: # Cycling commands - show with signature if available display_suggestions = suggestions[:10] match_line = ' '.join(display_suggestions) if selected_value in COMMAND_SIGNATURES: signature = COMMAND_SIGNATURES[selected_value] if signature: display_line = f"{match_line} → {signature}" else: display_line = match_line else: display_line = match_line try: window.addstr(1, 0, display_line, curses.A_DIM) except curses.error: pass window.move(0, cursor_pos) window.noutrefresh() curses.doupdate() # Actually push the refresh to screen continue # Enter key if key in (curses.KEY_ENTER, 10, 13): if len(current_input) > 0: # Add to history manager.command_history.append(current_input) if len(manager.command_history) > MAX_COMMAND_HISTORY: manager.command_history.pop(0) q.put(current_input) current_input = "" cursor_pos = 0 manager.cursor_pos = cursor_pos temp_history_index = -1 temp_input = "" suggestions = [] suggestion_index = -1 original_word = "" window.erase() window.noutrefresh() # Arrow UP - previous command elif key == curses.KEY_UP: if len(manager.command_history) > 0: # Save current input when first entering history if temp_history_index == -1: temp_input = current_input temp_history_index = len(manager.command_history) if temp_history_index > 0: temp_history_index -= 1 current_input = manager.command_history[temp_history_index] cursor_pos = len(current_input) manager.cursor_pos = cursor_pos suggestions = [] suggestion_index = -1 original_word = "" window.erase() window.addstr(0, 0, current_input) window.noutrefresh() # Arrow DOWN - next command elif key == curses.KEY_DOWN: if temp_history_index != -1: temp_history_index += 1 if temp_history_index >= len(manager.command_history): # Restore temp input current_input = temp_input temp_history_index = -1 temp_input = "" else: current_input = manager.command_history[temp_history_index] cursor_pos = len(current_input) manager.cursor_pos = cursor_pos suggestions = [] suggestion_index = -1 original_word = "" window.erase() window.addstr(0, 0, current_input) window.noutrefresh() # Arrow LEFT - move cursor left elif key == curses.KEY_LEFT: if cursor_pos > 0: cursor_pos -= 1 manager.cursor_pos = cursor_pos window.move(0, cursor_pos) window.noutrefresh() # Arrow RIGHT - move cursor right elif key == curses.KEY_RIGHT: if cursor_pos < len(current_input): cursor_pos += 1 manager.cursor_pos = cursor_pos window.move(0, cursor_pos) window.noutrefresh() # Backspace elif key in (curses.KEY_BACKSPACE, 127, 8): if cursor_pos > 0: current_input = current_input[:cursor_pos-1] + current_input[cursor_pos:] cursor_pos -= 1 manager.cursor_pos = cursor_pos temp_history_index = -1 # Exit history mode window.erase() window.addstr(0, 0, current_input) # Parse input and update autocomplete display words = current_input.split() ends_with_space = current_input.endswith(' ') if words: first_word = words[0].lower() suggestions, suggestion_index, original_word = update_autocomplete_display( window, current_input, first_word, words, ends_with_space ) window.move(0, cursor_pos) window.noutrefresh() curses.doupdate() # Immediate screen update # Regular character elif 32 <= key <= 126: char = chr(key) current_input = current_input[:cursor_pos] + char + current_input[cursor_pos:] cursor_pos += 1 manager.cursor_pos = cursor_pos temp_history_index = -1 # Exit history mode window.erase() window.addstr(0, 0, current_input) # Parse input and update autocomplete display words = current_input.split() ends_with_space = current_input.endswith(' ') if words: first_word = words[0].lower() suggestions, suggestion_index, original_word = update_autocomplete_display( window, current_input, first_word, words, ends_with_space ) window.move(0, cursor_pos) window.noutrefresh() curses.doupdate() # Immediate screen update except Exception as e: logger.error(f'Input error: {e}') # Log but continue - input thread should stay alive window.move(0, cursor_pos) curses.doupdate() self.input_queue = queue.Queue() t = threading.Thread(target=wait_stdin, args=(self.input_queue, self.input_window, self)) t.daemon = True t.start() return self.input_queue def setup_logging(self): """Setup logging handler for output window""" handler = CursesHandler(self.output_window) formatter = logging.Formatter('%(asctime)-8s|%(name)-12s|%(levelname)-6s|%(message)-s', '%H:%M:%S') handler.setFormatter(formatter) return handler def print_message(self, message, attributes=0): """Print formatted message to output window""" print_colored(self.output_window, message, attributes) self.output_window.noutrefresh() # Restore cursor to input window at current position self.input_window.move(0, self.cursor_pos) self.input_window.noutrefresh() curses.doupdate() def update_server_info(self, game_state): """Update server info window""" self.info_window.erase() max_y, max_x = self.info_window.getmaxyx() server_info = game_state.server_info # Line 1: Hostname with Timer and Warmup Indicator hostname = server_info.hostname timer_display = "" if server_info.match_time > 0 and not server_info.warmup: # Calculate live time: add elapsed seconds since last server update current_time = server_info.match_time if server_info.match_time_last_sync > 0: elapsed = int(time.time() - server_info.match_time_last_sync) current_time += elapsed mins = current_time // 60 secs = current_time % 60 timer_display = f"^3^0Time:^8^7 {mins}:{secs:02d}^0" else: timer_display = "^3^0Time:^8^7 0:00^0" warmup_display = "^3^0Warmup:^8 ^2YES^0" if server_info.warmup else "^3^0Warmup: ^8^1NO^0" print_colored(self.info_window, f"^3Name:^8 {hostname} {warmup_display} {timer_display}\n", 0) # Line 2: Game info gametype = server_info.gametype mapname = server_info.map timelimit = server_info.timelimit fraglimit = server_info.fraglimit roundlimit = server_info.roundlimit caplimit = server_info.capturelimit curclients = len(server_info.players) maxclients = server_info.maxclients # Context-sensitive limit display based on gametype if gametype == 'Capture the Flag': limit_display = f"^3^0| Capturelimit:^7^8 {caplimit}" elif gametype == 'Clan Arena': limit_display = f"^3^0| Roundlimit:^7^8 {roundlimit}" elif gametype == 'Duel': limit_display = f"^3^0| Timelimit:^7^8 {timelimit}" elif gametype == 'Race': limit_display = f"^3^0| Timelimit:^7^8 {timelimit}" else: limit_display = f"^3^0| Timelimit:^7^8 {timelimit} ^0^3| Fraglimit:^7^8 {fraglimit}" print_colored(self.info_window, f"^3^0Type:^7^8 {gametype} ^0^3| Map:^7^8 {mapname} ^0^3| Players:^7^8 {curclients}/{maxclients} " f"{limit_display}^0\n", 0) # Blank lines to fill self.info_window.addstr("\n") # Line 3: Team headers and player lists teams = game_state.player_tracker.get_players_by_team() if server_info.gametype in TEAM_MODES: if server_info.gametype == 'Clan Arena': red_score = f"{server_info.red_rounds:>3} " blue_score = f"{server_info.blue_rounds:>3} " else: red_total = 0 blue_total = 0 for player_name, player_data in server_info.players.items(): team = game_state.player_tracker.get_team(player_name) score = int(player_data.get('score', 0)) if team == 'RED': red_total += score elif team == 'BLUE': blue_total += score red_score = f"{red_total:>3} " blue_score = f"{blue_total:>3} " print_colored(self.info_window, f"^8^7{red_score}^9^1RED TEAM^0 ^7^8{blue_score}^9^4BLUE TEAM\n", 0) # Sort players by score within each team red_players_with_scores = [] blue_players_with_scores = [] spec_players = [] for player_name in teams['RED']: score = int(server_info.players.get(player_name, {}).get('score', 0)) red_players_with_scores.append((player_name, score)) for player_name in teams['BLUE']: score = int(server_info.players.get(player_name, {}).get('score', 0)) blue_players_with_scores.append((player_name, score)) # Sort by score descending red_players_with_scores.sort(key=lambda x: x[1], reverse=True) blue_players_with_scores.sort(key=lambda x: x[1], reverse=True) red_players = [name for name, score in red_players_with_scores[:4]] blue_players = [name for name, score in blue_players_with_scores[:4]] spec_players = teams['SPECTATOR'][:4] for i in range(4): red_name = red_players[i] if i < len(red_players) else '' blue_name = blue_players[i] if i < len(blue_players) else '' # Get scores for team players red_score = server_info.players.get(red_name, {}).get('score', '0') if red_name else '' blue_score = server_info.players.get(blue_name, {}).get('score', '0') if blue_name else '' # Check if players are dead red_dead = red_name in server_info.dead_players blue_dead = blue_name in server_info.dead_players # Format with strikethrough for dead players (using dim text) red = f"{red_score:>3} {'^8^1X^7^0 ' if red_dead else ''}{red_name}" if red_name else '' blue = f"{blue_score:>3} {'^8^1X^7^0 ' if blue_dead else ''}{blue_name}" if blue_name else '' from formatter import strip_color_codes red_clean = strip_color_codes(red) blue_clean = strip_color_codes(blue) red_pad = 24 - len(red_clean) line = f"^8{red}^0{' ' * red_pad}^8{blue}^0\n" print_colored(self.info_window, line, 0) else: print_colored(self.info_window, f" ^8^9^5FREE\n", 0) # Sort FREE players by score (highest first) free_players = teams['FREE'] free_players_with_scores = [] for player_name in free_players: score = int(server_info.players.get(player_name, {}).get('score', 0)) free_players_with_scores.append((player_name, score)) # Sort by score descending free_players_with_scores.sort(key=lambda x: x[1], reverse=True) sorted_free_players = [name for name, score in free_players_with_scores] spec_players = teams['SPECTATOR'][:4] free_col1 = sorted_free_players[:4] free_col2 = sorted_free_players[4:8] for i in range(4): col1_name = free_col1[i] if i < len(free_col1) else '' col2_name = free_col2[i] if i < len(free_col2) else '' # Get scores for FREE players col1_score = server_info.players.get(col1_name, {}).get('score', '0') if col1_name else '' col2_score = server_info.players.get(col2_name, {}).get('score', '0') if col2_name else '' # Check if players are dead col1_dead = col1_name in server_info.dead_players col2_dead = col2_name in server_info.dead_players # Format: " 9 PlayerName" with right-aligned score and dead marker col1 = f"{col1_score:>3} {'^8^1X^7^0 ' if col1_dead else ''}{col1_name}" if col1_name else '' col2 = f"{col2_score:>3} {'^8^1X^7^0 ' if col2_dead else ''}{col2_name}" if col2_name else '' from formatter import strip_color_codes col1_clean = strip_color_codes(col1) col2_clean = strip_color_codes(col2) col1_pad = 24 - len(col1_clean) line = f"^8{col1}^0{' ' * col1_pad}^8{col2}^0\n" print_colored(self.info_window, line, 0) # Blank lines to fill self.info_window.addstr("\n") # List spectators on one line spec_list = " ".join(spec_players) line = f"^8^3Spectators:^7 {spec_list}\n" print_colored(self.info_window, line, 0) # Blank lines to fill self.info_window.addstr("\n") # Separator separator = "^7" + "═" * (max_x - 1) + "^7" print_colored(self.info_window, separator, 0) self.info_window.noutrefresh() # Restore cursor to input window at current position self.input_window.move(0, self.cursor_pos) self.input_window.noutrefresh() curses.doupdate()