import struct from enum import IntEnum, Enum, auto from dataclasses import dataclass from typing import Callable from pipe_operator import PipeOperator PIPE_NAME: str = "ed0e3f1f-d214-4880-9562-640bce15e72e" class ProtocolCode(IntEnum): HANDSHAKE_REQUEST = 0x61 # Trainer -> Presenter HANDSHAKE_RESPONSE = 0x62 # Presenter -> Trainer DATA_READY = 0x01 # Presenter -> Trainer DATA_RECEIVED = 0x02 # Trainer -> Presenter ACTIVELY_STOP = 0x21 # Presenter -> Trainer STOP_REQUEST = 0x71 # Trainer -> Presenter STOP_RESPONSE = 0x72 # Trainer -> Presenter class PixelKind(IntEnum): GRAY_FLOAT32 = 0x01 # Grayscale represented by one float32 GRAY_U8 = 0x02 # Grayscale represented by one u8 RGB_FLOAT32 = 0x03 # RGB represented by three float32 RGB_U8 = 0x04 # RGB represented by three u8 @dataclass class HandshakePayload: pixel_kind: PixelKind width: int height: int class ServerStatus(Enum): Ready = auto() Running = auto() Stop = auto() CODE_PACKER: struct.Struct = struct.Struct("=B") HANDSHAKE_REQUEST_PACKER: struct.Struct = struct.Struct("=BII") class CmdServer: """ Command server implementation for the Trainer side according to the protocol. """ __pipe_operator: PipeOperator __status: ServerStatus def __init__(self): self.__pipe_operator = PipeOperator(PIPE_NAME) self.__status = ServerStatus.Ready def __del__(self): """Cleanup resources when object is destroyed.""" self.__pipe_operator.close() def wait_handshake(self, payload: HandshakePayload) -> None: """ Wait for handshake from Presenter, send request first and wait for response with data properties. Returns a tuple of (pixel_kind, width, height) from the Presenter. """ if self.__status != ServerStatus.Ready: raise RuntimeError("unexpected server status") # Send handshake request to Presenter self.__pipe_operator.write(CODE_PACKER.pack(ProtocolCode.HANDSHAKE_REQUEST)) # And the payload data self.__pipe_operator.write( HANDSHAKE_REQUEST_PACKER.pack( payload.pixel_kind, payload.width, payload.height ) ) # Wait for handshake response from Presenter (code 0x62) code_bytes = self.__pipe_operator.read(CODE_PACKER.size) (code,) = CODE_PACKER.unpack(code_bytes) if ProtocolCode(code) != ProtocolCode.HANDSHAKE_RESPONSE: raise RuntimeError("expect handshake response code, but got another") # Set status and return self.__status = ServerStatus.Running return def tick(self, data_receiver: Callable[[], None], request_stop: bool) -> bool: """ Tick function called every frame to wait for data ready from Presenter and send response. Returns True if a stop code was received (meaning the process should stop), False otherwise. """ if self.__status != ServerStatus.Running: raise RuntimeError("unexpected server status") # If there is stop requested from us, # we order Presenter exit and enter next step. if request_stop: self.__pipe_operator.write(CODE_PACKER.pack(ProtocolCode.STOP_REQUEST)) while True: # Wait for code from Presenter code_bytes = self.__pipe_operator.read(CODE_PACKER.size) (code,) = CODE_PACKER.unpack(code_bytes) # Analyse code match ProtocolCode(code): case ProtocolCode.DATA_READY: # Receive data data_receiver() # Send data received symbol self.__pipe_operator.write( CODE_PACKER.pack(ProtocolCode.DATA_RECEIVED) ) return False case ProtocolCode.ACTIVELY_STOP: # Presenter requested stop. # Agree with it, send code and wait response self.__pipe_operator.write( CODE_PACKER.pack(ProtocolCode.STOP_REQUEST) ) case ProtocolCode.STOP_RESPONSE: # Set self status and return self.__status = ServerStatus.Stop return True case _: raise RuntimeError("unexpected protocol code when running")