import struct from enum import IntEnum, Enum, auto from dataclasses import dataclass from typing import Tuple, Optional from pipe_operator import PipeOperator PIPE_NAME: str = "ed0e3f1f-d214-4880-9562-640bce15e72e" class ProtocolCode(IntEnum): HANDSHAKE_REQUEST = 0x61 # Trainer -> Presenter HANDSHAKE_RESPONSE = 0x62 # Presenter -> Trainer DATA_READY = 0x01 # Presenter -> Trainer DATA_RECEIVED = 0x02 # Trainer -> Presenter REQUEST_STOP = 0x71 # Presenter -> Trainer (request stop) STOP = 0x71 # Trainer -> Presenter (confirm stop) class PixelKind(IntEnum): GRAY_FLOAT32 = 0x01 # Grayscale represented by one float32 GRAY_U8 = 0x02 # Grayscale represented by one u8 RGB_FLOAT32 = 0x03 # RGB represented by three float32 RGB_U8 = 0x04 # RGB represented by three u8 @dataclass class ImageProperties: pixel_kind: PixelKind width: int height: int class ServerStatus(Enum): Ready = auto() Running = auto() Stop = auto() CODE_PACKER: struct.Struct = struct.Struct("=B") HANDSHAKE_RESPONSE_PACKER: struct.Struct = struct.Struct("=BII") class CommandServer: """ Command server implementation for the Trainer side according to the protocol. """ pipe_operator: PipeOperator statue: ServerStatus def __init__(self): self.pipe_operator = PipeOperator(PIPE_NAME) self.handshaked = ServerStatus.Ready def __del__(self): """Cleanup resources when object is destroyed.""" self.pipe_operator.close() def wait_handshake(self) -> ImageProperties: """ Wait for handshake from Presenter, send request first and wait for response with data properties. Returns a tuple of (pixel_kind, width, height) from the Presenter. """ if self.statue != ServerStatus.Ready: raise RuntimeError("unexpected server status") # Send handshake request to Presenter (code 0x61) self.pipe_operator.write(CODE_PACKER.pack(ProtocolCode.HANDSHAKE_REQUEST)) # Wait for handshake response from Presenter (code 0x62) code_bytes = self.pipe_operator.read(CODE_PACKER.size) (code, ) = CODE_PACKER.unpack(code_bytes) if ProtocolCode(code) != ProtocolCode.HANDSHAKE_RESPONSE: raise RuntimeError("expect handshake response code, but got another") # Read data properties from Presenter (pixel_kind, width, height) handshake_response_payload = self.pipe_operator.read(HANDSHAKE_RESPONSE_PACKER.size) (raw_pixel_kind, width, height) = HANDSHAKE_RESPONSE_PACKER.unpack(handshake_response_payload) # Set status and return self.statue = ServerStatus.Running return ImageProperties(PixelKind(raw_pixel_kind), width, height) def tick(self, request_stop: bool) -> bool: """ Tick function called every frame to wait for data ready from Presenter and send response. Returns True if a stop code was received (meaning the process should stop), False otherwise. """ if self.statue != ServerStatus.Running: raise RuntimeError("unexpected server status") # If there is stop requested, we post it first and return if request_stop: self.pipe_operator.write(CODE_PACKER.pack(ProtocolCode.STOP)) self.statue = ServerStatus.Stop return True # Wait for code from Presenter code_bytes = self.pipe_operator.read(CODE_PACKER.size) (code, ) = CODE_PACKER.unpack(code_bytes) # Analyse code match ProtocolCode(code): case ProtocolCode.DATA_READY: # Receive data print('Data received') # Send data received symbol self.pipe_operator.write(CODE_PACKER.pack(ProtocolCode.DATA_RECEIVED)) case ProtocolCode.REQUEST_STOP: # Presenter requested stop. # Agree with it, send code and return. self.pipe_operator.write(CODE_PACKER.pack(ProtocolCode.STOP)) self.statue = ServerStatus.Stop return True case _: raise RuntimeError("unexpected protocol code when running") return False