finish trainer protocol update
This commit is contained in:
@@ -1,8 +1,10 @@
|
||||
import os
|
||||
import sys
|
||||
from typing import Optional, Any
|
||||
import struct
|
||||
from typing import Optional, Any, ClassVar
|
||||
|
||||
IS_WINDOWS: bool = sys.platform == "win32"
|
||||
IS_LITTLE_ENDIAN: bool = sys.byteorder == "little"
|
||||
|
||||
if IS_WINDOWS:
|
||||
import win32pipe
|
||||
@@ -151,3 +153,43 @@ class PipeOperator:
|
||||
total_written += bytes_written
|
||||
except OSError as e:
|
||||
raise RuntimeError(f"Failed to write to named pipe: {e}")
|
||||
|
||||
def read_pod(self, pattern: struct.Struct) -> tuple[Any, ...]:
|
||||
return pattern.unpack(self.read(pattern.size))
|
||||
|
||||
def write_pod(self, pattern: struct.Struct, *args) -> None:
|
||||
self.write(pattern.pack(*args))
|
||||
|
||||
STR_LEN_PACKER: ClassVar[struct.Struct] = struct.Struct("=N")
|
||||
|
||||
def read_string(self) -> str:
|
||||
(length,) = self.read_pod(PipeOperator.STR_LEN_PACKER)
|
||||
str_bytes = self.read(length)
|
||||
return str_bytes.decode("utf-8")
|
||||
|
||||
def write_string(self, s: str) -> None:
|
||||
str_bytes = s.encode("utf-8")
|
||||
self.write_pod(PipeOperator.STR_LEN_PACKER, len(str_bytes))
|
||||
self.write(str_bytes)
|
||||
|
||||
def read_bsstring(self) -> str:
|
||||
(length,) = self.read_pod(PipeOperator.STR_LEN_PACKER)
|
||||
str_bytes = self.read(length)
|
||||
return self.__decode_bsstring(str_bytes)
|
||||
|
||||
def write_bsstring(self, s: str) -> None:
|
||||
str_bytes = self.__encode_bsstring(s)
|
||||
self.write_pod(PipeOperator.STR_LEN_PACKER, len(str_bytes))
|
||||
self.write(str_bytes)
|
||||
|
||||
def __encode_bsstring(self, s: str) -> bytes:
|
||||
if IS_LITTLE_ENDIAN:
|
||||
return s.encode("utf_16_le")
|
||||
else:
|
||||
return s.encode("utf_16_be")
|
||||
|
||||
def __decode_bsstring(self, b: bytes) -> str:
|
||||
if IS_LITTLE_ENDIAN:
|
||||
return b.decode("utf_16_le")
|
||||
else:
|
||||
return b.decode("utf_16_be")
|
||||
|
||||
Reference in New Issue
Block a user