168 lines
5.4 KiB
Python
168 lines
5.4 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ZMQ network layer for QLPyCon
|
|
Handles RCON and stats stream connections
|
|
"""
|
|
|
|
import zmq
|
|
import struct
|
|
import logging
|
|
import time
|
|
|
|
logger = logging.getLogger('network')
|
|
|
|
|
|
def read_socket_event(msg):
|
|
"""Parse ZMQ socket monitor event"""
|
|
event_id = struct.unpack('<H', msg[:2])[0]
|
|
event_names = {
|
|
zmq.EVENT_ACCEPTED: 'EVENT_ACCEPTED',
|
|
zmq.EVENT_ACCEPT_FAILED: 'EVENT_ACCEPT_FAILED',
|
|
zmq.EVENT_BIND_FAILED: 'EVENT_BIND_FAILED',
|
|
zmq.EVENT_CLOSED: 'EVENT_CLOSED',
|
|
zmq.EVENT_CLOSE_FAILED: 'EVENT_CLOSE_FAILED',
|
|
zmq.EVENT_CONNECTED: 'EVENT_CONNECTED',
|
|
zmq.EVENT_CONNECT_DELAYED: 'EVENT_CONNECT_DELAYED',
|
|
zmq.EVENT_CONNECT_RETRIED: 'EVENT_CONNECT_RETRIED',
|
|
zmq.EVENT_DISCONNECTED: 'EVENT_DISCONNECTED',
|
|
zmq.EVENT_LISTENING: 'EVENT_LISTENING',
|
|
zmq.EVENT_MONITOR_STOPPED: 'EVENT_MONITOR_STOPPED',
|
|
}
|
|
event_name = event_names.get(event_id, f'{event_id}')
|
|
event_value = struct.unpack('<I', msg[2:])[0]
|
|
return (event_id, event_name, event_value)
|
|
|
|
|
|
def check_monitor(monitor):
|
|
"""Check monitor socket for events"""
|
|
try:
|
|
event_monitor = monitor.recv(zmq.NOBLOCK)
|
|
except zmq.Again:
|
|
return None
|
|
|
|
event_id, event_name, event_value = read_socket_event(event_monitor)
|
|
event_endpoint = monitor.recv(zmq.NOBLOCK)
|
|
logger.debug(f'Monitor: {event_name} {event_value} endpoint {event_endpoint}')
|
|
return (event_id, event_value)
|
|
|
|
|
|
class RconConnection:
|
|
"""RCON connection to Quake Live server"""
|
|
|
|
def __init__(self, host, password, identity):
|
|
self.host = host
|
|
self.password = password
|
|
self.identity = identity
|
|
self.context = None
|
|
self.socket = None
|
|
self.monitor = None
|
|
|
|
def connect(self):
|
|
"""Initialize connection"""
|
|
logger.info('Initializing ZMQ context...')
|
|
self.context = zmq.Context()
|
|
|
|
logger.info('Creating DEALER socket...')
|
|
self.socket = self.context.socket(zmq.DEALER)
|
|
|
|
logger.info('Setting up socket monitor...')
|
|
self.monitor = self.socket.get_monitor_socket(zmq.EVENT_ALL)
|
|
|
|
if self.password:
|
|
logger.info('Setting password for access')
|
|
self.socket.plain_username = b'rcon'
|
|
self.socket.plain_password = self.password.encode('utf-8')
|
|
self.socket.zap_domain = b'rcon'
|
|
|
|
logger.info(f'Setting socket identity: {self.identity}')
|
|
self.socket.setsockopt(zmq.IDENTITY, self.identity.encode('utf-8'))
|
|
|
|
self.socket.connect(self.host)
|
|
logger.info('Connection initiated, waiting for events...')
|
|
|
|
def send_command(self, command):
|
|
"""Send RCON command"""
|
|
if isinstance(command, str):
|
|
command = command.encode('utf-8')
|
|
self.socket.send(command)
|
|
logger.info(f'Sent command: {command}')
|
|
|
|
def poll(self, timeout):
|
|
"""Poll for messages"""
|
|
return self.socket.poll(timeout)
|
|
|
|
def recv_message(self):
|
|
"""Receive a message (non-blocking)"""
|
|
try:
|
|
return self.socket.recv(zmq.NOBLOCK).decode('utf-8', errors='replace')
|
|
except zmq.error.Again:
|
|
return None
|
|
|
|
def check_monitor(self):
|
|
"""Check monitor for events"""
|
|
return check_monitor(self.monitor)
|
|
|
|
def close(self):
|
|
"""Close connection"""
|
|
if self.socket:
|
|
self.socket.setsockopt(zmq.LINGER, 0) # Don't wait for unsent messages
|
|
self.socket.close()
|
|
if self.context:
|
|
self.context.term()
|
|
|
|
|
|
class StatsConnection:
|
|
"""Stats stream connection (ZMQ SUB socket)"""
|
|
|
|
def __init__(self, host, port, password):
|
|
self.host = host
|
|
self.port = port
|
|
self.password = password
|
|
self.context = None
|
|
self.socket = None
|
|
self.connected = False
|
|
|
|
def connect(self):
|
|
"""Connect to stats stream"""
|
|
stats_host = f'tcp://{self.host}:{self.port}'
|
|
logger.info(f'Connecting to stats stream: {stats_host}')
|
|
|
|
self.context = zmq.Context()
|
|
self.socket = self.context.socket(zmq.SUB)
|
|
logger.debug('Stats socket created (SUB type)')
|
|
|
|
if self.password and self.password.strip():
|
|
logger.debug('Setting PLAIN authentication')
|
|
self.socket.setsockopt(zmq.PLAIN_USERNAME, b'stats')
|
|
self.socket.setsockopt(zmq.PLAIN_PASSWORD, self.password.encode('utf-8'))
|
|
self.socket.setsockopt_string(zmq.ZAP_DOMAIN, 'stats')
|
|
|
|
logger.debug(f'Connecting to {stats_host}')
|
|
self.socket.connect(stats_host)
|
|
|
|
logger.debug('Setting ZMQ_SUBSCRIBE to empty (all messages)')
|
|
self.socket.setsockopt(zmq.SUBSCRIBE, b'')
|
|
|
|
time.sleep(0.5)
|
|
self.connected = True
|
|
logger.info('Stats stream connected')
|
|
|
|
def recv_message(self):
|
|
"""Receive stats message (non-blocking)"""
|
|
if not self.connected:
|
|
return None
|
|
|
|
try:
|
|
msg = self.socket.recv(zmq.NOBLOCK)
|
|
return msg.decode('utf-8', errors='replace')
|
|
except zmq.error.Again:
|
|
return None
|
|
|
|
def close(self):
|
|
"""Close connection"""
|
|
if self.socket:
|
|
self.socket.setsockopt(zmq.LINGER, 0) # Don't wait for unsent messages
|
|
self.socket.close()
|
|
if self.context:
|
|
self.context.term()
|