import os import sys import struct from typing import Optional, Any IS_WINDOWS: bool = sys.platform == "win32" IS_LITTLE_ENDIAN: bool = sys.byteorder == "little" if IS_WINDOWS: import win32pipe import win32file import pywintypes else: import errno BSSTR_LEN_PACKER: struct.Struct = struct.Struct("N") BSCHAR_SIZE: int = struct.calcsize("H") if IS_WINDOWS else struct.calcsize("c") class PipeOperator: """ A Python implementation of the pipe operator similar to the C++ version. Uses win32pipe on Windows and os.mkfifo on POSIX systems. """ pipe_handle: Optional[int] pipe_name: str def __init__(self, name: str): """ Initialize the PipeOperator. :param name: Name of the pipe :param is_server: True if this instance should create the pipe (server), False if connecting to existing pipe (client) """ self.pipe_handle = None if IS_WINDOWS: self.pipe_name = f"\\\\.\\pipe\\{name}" self.pipe_handle = win32pipe.CreateNamedPipe( self.pipe_name, win32pipe.PIPE_ACCESS_DUPLEX, win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE | win32pipe.PIPE_WAIT, 1, # Number of pipe instances 65536, # Output buffer size 65536, # Input buffer size 0, # Default timeout None, # Security attributes ) if self.pipe_handle == win32file.INVALID_HANDLE_VALUE: raise RuntimeError("Failed to create named pipe.") else: # POSIX implementation self.pipe_name = f"/tmp/{name}" # Create pipe try: os.mkfifo(self.pipe_name) except OSError as e: if e.errno != errno.EEXIST: raise RuntimeError(f"Failed to create named pipe: {e}") # Open the FIFO for reading and writing (blocks until client connects) self.pipe_handle = os.open(self.pipe_name, os.O_RDWR) def __del__(self): """Cleanup resources when object is destroyed.""" self.close() def close(self): """Close the pipe handle.""" if self.pipe_handle is not None: if IS_WINDOWS: win32file.CloseHandle(self.pipe_handle) else: os.close(self.pipe_handle) # Remove the FIFO file if it is existing if os.path.exists(self.pipe_name): os.unlink(self.pipe_name) self.pipe_handle = None def read(self, size: int) -> bytes: """ Read data from the pipe. This is a blocking operation. :param size: Number of bytes to read :return: Bytes read from the pipe """ if size <= 0: return b"" if self.pipe_handle is None: raise RuntimeError("Pipe is not open") if IS_WINDOWS: try: # The result is a tuple of (hr, string/PyOVERLAPPEDReadBuffer). # You can convert this to a string (str(object)) [py2k] or (bytes(object)) [py3k] to obtain the data. # I can't visit PyOVERLAPPEDReadBuffer so I simply set it to Any data: Any (hr, data) = win32file.ReadFile(self.pipe_handle, size) if hr != 0: # Error occurred raise RuntimeError(f"Failed to read from pipe, error code: {hr}") data = bytes(data) if len(data) != size: raise RuntimeError( f"Incomplete read from pipe: expected {size} bytes, got {len(data)} bytes" ) return data except pywintypes.error as e: raise RuntimeError(f"Failed to read from named pipe: {e}") else: # POSIX implementation data = b"" while len(data) < size: try: chunk = os.read(self.pipe_handle, size - len(data)) if not chunk: raise RuntimeError("Named pipe closed during read") data += chunk except OSError as e: raise RuntimeError(f"Failed to read from named pipe: {e}") if len(data) != size: raise RuntimeError( f"Incomplete read from pipe: expected {size} bytes, got {len(data)} bytes" ) return data def write(self, data: bytes) -> None: """ Write data to the pipe. This is a blocking operation. :param data: Data to write to the pipe """ if len(data) == 0: return if self.pipe_handle is None: raise RuntimeError("Pipe is not open") if IS_WINDOWS: try: win32file.WriteFile(self.pipe_handle, data) except pywintypes.error as e: raise RuntimeError(f"Failed to write to named pipe: {e}") else: # POSIX implementation total_written = 0 while total_written < len(data): try: bytes_written = os.write(self.pipe_handle, data[total_written:]) total_written += bytes_written except OSError as e: raise RuntimeError(f"Failed to write to named pipe: {e}") def read_pod(self, pattern: struct.Struct) -> tuple[Any, ...]: return pattern.unpack(self.read(pattern.size)) def write_pod(self, pattern: struct.Struct, *args) -> None: self.write(pattern.pack(*args)) def read_string(self) -> str: (length,) = self.read_pod(BSSTR_LEN_PACKER) str_bytes = self.read(length) return str_bytes.decode("utf-8") def write_string(self, s: str) -> None: str_bytes = s.encode("utf-8") self.write_pod(BSSTR_LEN_PACKER, len(str_bytes)) self.write(str_bytes) def read_bsstring(self) -> str: (length,) = self.read_pod(BSSTR_LEN_PACKER) str_bytes = self.read(length * BSCHAR_SIZE) return self.__decode_bsstring(str_bytes) def write_bsstring(self, s: str) -> None: str_bytes = self.__encode_bsstring(s) self.write_pod(BSSTR_LEN_PACKER, len(str_bytes) // BSCHAR_SIZE) self.write(str_bytes) def __encode_bsstring(self, s: str) -> bytes: if IS_WINDOWS: if IS_LITTLE_ENDIAN: return s.encode("utf_16_le") else: return s.encode("utf_16_be") else: return s.encode("utf-8") def __decode_bsstring(self, b: bytes) -> str: if IS_WINDOWS: if IS_LITTLE_ENDIAN: return b.decode("utf_16_le") else: return b.decode("utf_16_be") else: return b.decode("utf-8")