Source code for abacura.mud.options.msdp

"""MSDP telnet option processor"""
import re
import time

from textual import log

from abacura.mud.options import IAC, SE, SB, TelnetOption
from abacura.plugins.events import AbacuraMessage

VAR = b'\x01'
VAL = b'\x02'
TABLE_OPEN = b'\x03'
TABLE_CLOSE= b'\x04'
ARRAY_OPEN = b'\x05'
ARRAY_CLOSE= b'\x06'

#TODO: Move ansi_escape somewhere else, we'll need it for triggers
ansi_escape = re.compile(r'\x1b(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')

[docs]class MSDPMessage(AbacuraMessage): """MSDP event message""" def __init__(self, name: str, value: any, oldvalue: any = "", subtype: str = ""): super().__init__(name, value) self.oldvalue = oldvalue self.subtype = subtype
# TODO all these need to use the regular socket to trap send instead of calling writer directly
[docs]class MSDP(TelnetOption): """Handle MSDP TelnetOptions""" def __init__(self, handler, writer, session): self.code = 69 self.hexcode = b'\x45' self.handler = handler self.writer = writer self.session = session self.values = {} self.initialized: bool = False
[docs] def msdpvar(self, buf) -> tuple[bytes, bytes]: """Handle MSDP VAR sequences""" buf = buf[1:] name, _, remainder = buf.partition(VAL) return name, remainder
[docs] def msdpval(self, buf) -> tuple[bytes, bytes]: """Handle MSDP VAL sequences""" val, _, remainder = buf.partition(IAC) return val, remainder
# TODO generic handler for MSDP array and tables needed
[docs] def msdparray(self, buf): """NotImplemented: generic MSDP array parser"""
[docs] def msdptable(self, buf): """NotImplemented: generic MSDP table parser"""
# TODO move this to abacura-kallisti once we have config options for MSDP parser
[docs] def parse_reportable_variables(self, buf) -> list: """Kallisti-specific parser for REPORTABLE_VARIABLES""" buf = buf[1:-1] return [x.decode("UTF-8") for x in buf.split(VAL) if len(x) > 1]
[docs] def parse_group(self, buf) -> list: """Kallisti-specifc parser for GROUP""" def parse_group_member(line) -> dict: items = line.split(b'\x01') member = {} items = items[1:] for item in items: pair = item.split(b'\x02') #try: # member[pair[0].decode("UTF-8")] = int(pair[1]) #except ValueError: member[pair[0].decode("UTF-8")] = ansi_escape.sub('',pair[1].decode("UTF-8")) return member # Empty group if buf == b'': return [] buf = buf[3:-2] elements = buf.split(b'\x04\x02\x03') log(elements) element_list = [] for element in elements: log(f"Parsing {element}") element_list.append(parse_group_member(element)) log(element_list) return element_list
[docs] def parse_exits(self, buf) -> dict: """Kallisti-specific parser for ROOM_EXITS""" if buf == b'': return {} buf = buf[2:-1] items = buf.split(b'\x01') exits = {} for item in items: pair = item.split(b'\x02') #try: # exits[pair[0].decode("UTF-8")] = int(pair[1]) #except ValueError: exits[pair[0].decode("UTF-8")] = ansi_escape.sub('',pair[1].decode("UTF-8")) return exits
[docs] def request_all_values(self) -> None: """Automatically request all possible MSDP values""" buf = IAC + SB + self.hexcode + VAR + bytes("REPORT", "UTF-8") + VAL msdp_vals = [VAL + bytes(f, "UTF-8") for f in self.values["REPORTABLE_VARIABLES"]] buf += VAL.join(msdp_vals) + IAC + SE self.writer(buf, raw=True)
[docs] def will(self): self.writer(b"\xff\xfd\x45", raw=True) response = [IAC,SB,self.hexcode,VAR,b"LIST",VAL,b"REPORTABLE_VARIABLES",IAC,SE] self.writer(b''.join(response), raw=True)
[docs] def sb(self, sb): log.debug("MSDP SB parsing") sb = sb[1:] first = sb[0:1] if first == b'\x01': varname, sb = self.msdpvar(sb) var = varname.decode("UTF-8") value, sb = self.msdpval(sb) try: oldvalue = self.values[var] except KeyError: oldvalue = None if var == "REPORTABLE_VARIABLES": #self.handler(f"MSDP: Requesting all variables from {var}") self.values[var] = self.parse_reportable_variables(value) if not self.initialized: self.request_all_values() self.initialized = True elif var == "GROUP": self.values[var] = self.parse_group(value) elif var == "ROOM_EXITS": self.values[var] = self.parse_exits(value) elif var == "AFFECTS": self.values[var] = self.parse_exits(value) else: self.values[var] = ansi_escape.sub('',value.decode("UTF-8")) #try: # self.values[var] = int(self.values[var]) #except ValueError: # pass # Two dispatchers here, first is "all", then "name specific" msg = MSDPMessage(var, self.values[var], oldvalue="", subtype = "value_change") self.session.dispatcher("msdp_value", msg) self.session.dispatcher(f"msdp_value_{var}", msg) else: # TODO this is a candidate for some kind of protocol.log self.handler(f"MSDP: Don't know how to handle {sb}")