import struct from enum import IntEnum, Enum, auto from dataclasses import dataclass from typing import Callable from pipe_operator import PipeOperator PIPE_NAME: str = "ed0e3f1f-d214-4880-9562-640bce15e72e" class ProtocolCode(IntEnum): HANDSHAKE_REQUEST = 0x61 # Trainer -> Presenter HANDSHAKE_RESPONSE = 0x62 # Presenter -> Trainer DATA_READY = 0x01 # Presenter -> Trainer DATA_RECEIVED = 0x02 # Trainer -> Presenter ACTIVELY_STOP = 0x21 # Presenter -> Trainer STOP_REQUEST = 0x71 # Trainer -> Presenter STOP_RESPONSE = 0x72 # Trainer -> Presenter class PixelKind(IntEnum): GRAY_FLOAT32 = 0x01 # Grayscale represented by one float32 GRAY_U8 = 0x02 # Grayscale represented by one u8 RGB_FLOAT32 = 0x03 # RGB represented by three float32 RGB_U8 = 0x04 # RGB represented by three u8 @dataclass class HandshakePayload: headless: bool pixel_kind: PixelKind width: int height: int engine_name: str engine_device: int deliver_name: str deliver_device: int object_loader_name: str object_loader_file: str anime_loader_name: str anime_loader_file: str class ServerStatus(Enum): Ready = auto() Running = auto() Stop = auto() CODE_PACKER: struct.Struct = struct.Struct("=B") U8_PACKER: struct.Struct = struct.Struct("=B") U32_PACKER: struct.Struct = struct.Struct("=I") class CmdServer: """ Command server implementation for the Trainer side according to the protocol. """ __pipe_operator: PipeOperator __status: ServerStatus def __init__(self): self.__pipe_operator = PipeOperator(PIPE_NAME) self.__status = ServerStatus.Ready def __del__(self): """Cleanup resources when object is destroyed.""" self.__pipe_operator.close() def wait_handshake(self, payload: HandshakePayload) -> None: """ Wait for handshake from Presenter, send request first and wait for response with data properties. Returns a tuple of (pixel_kind, width, height) from the Presenter. """ if self.__status != ServerStatus.Ready: raise RuntimeError("unexpected server status") # Send handshake request to Presenter self.__pipe_operator.write_pod(CODE_PACKER, ProtocolCode.HANDSHAKE_REQUEST) # And the payload data self.__pipe_operator.write_pod(U8_PACKER, 1 if payload.headless else 0) self.__pipe_operator.write_pod(U8_PACKER, payload.pixel_kind) self.__pipe_operator.write_pod(U32_PACKER, payload.width) self.__pipe_operator.write_pod(U32_PACKER, payload.height) self.__pipe_operator.write_bsstring(payload.engine_name) self.__pipe_operator.write_pod(U32_PACKER, payload.engine_device) self.__pipe_operator.write_bsstring(payload.deliver_name) self.__pipe_operator.write_pod(U32_PACKER, payload.deliver_device) self.__pipe_operator.write_bsstring(payload.object_loader_name) self.__pipe_operator.write_bsstring(payload.object_loader_file) self.__pipe_operator.write_bsstring(payload.anime_loader_name) self.__pipe_operator.write_bsstring(payload.anime_loader_file) # Wait for handshake response from Presenter (code 0x62) (code,) = self.__pipe_operator.read_pod(CODE_PACKER) if ProtocolCode(code) != ProtocolCode.HANDSHAKE_RESPONSE: raise RuntimeError("expect handshake response code, but got another") # Set status and return self.__status = ServerStatus.Running return def tick(self, data_receiver: Callable[[], None], request_stop: bool) -> bool: """ Tick function called every frame to wait for data ready from Presenter and send response. Returns True if a stop code was received (meaning the process should stop), False otherwise. """ if self.__status != ServerStatus.Running: raise RuntimeError("unexpected server status") # If there is stop requested from us, # we order Presenter exit and enter next step. if request_stop: self.__pipe_operator.write_pod(CODE_PACKER, ProtocolCode.STOP_REQUEST) while True: # Wait for code from Presenter (code,) = self.__pipe_operator.read_pod(CODE_PACKER) # Analyse code match ProtocolCode(code): case ProtocolCode.DATA_READY: # Receive data data_receiver() # Send data received symbol self.__pipe_operator.write_pod( CODE_PACKER, ProtocolCode.DATA_RECEIVED ) return False case ProtocolCode.ACTIVELY_STOP: # Presenter requested stop. # Agree with it, send code and wait response self.__pipe_operator.write_pod( CODE_PACKER, ProtocolCode.STOP_REQUEST ) case ProtocolCode.STOP_RESPONSE: # Set self status and return self.__status = ServerStatus.Stop return True case _: raise RuntimeError("unexpected protocol code when running")