import os import sys from typing import Optional, Any IS_WINDOWS: bool = sys.platform == "win32" if IS_WINDOWS: import win32pipe import win32file import pywintypes else: import errno class PipeOperator: """ A Python implementation of the pipe operator similar to the C++ version. Uses win32pipe on Windows and os.mkfifo on POSIX systems. """ pipe_handle: Optional[int] pipe_name: str def __init__(self, name: str): """ Initialize the PipeOperator. :param name: Name of the pipe :param is_server: True if this instance should create the pipe (server), False if connecting to existing pipe (client) """ self.pipe_handle = None if IS_WINDOWS: self.pipe_name = f"\\\\.\\pipe\\{name}" self.pipe_handle = win32pipe.CreateNamedPipe( self.pipe_name, win32pipe.PIPE_ACCESS_DUPLEX, win32pipe.PIPE_TYPE_BYTE | win32pipe.PIPE_READMODE_BYTE | win32pipe.PIPE_WAIT, 1, # Number of pipe instances 65536, # Output buffer size 65536, # Input buffer size 0, # Default timeout None # Security attributes ) if self.pipe_handle == win32file.INVALID_HANDLE_VALUE: raise RuntimeError("Failed to create named pipe.") # Wait for client to connect win32pipe.ConnectNamedPipe(self.pipe_handle, None) else: # POSIX implementation self.pipe_name = f"/tmp/{name}" try: os.mkfifo(self.pipe_name) except OSError as e: if e.errno != errno.EEXIST: raise RuntimeError(f"Failed to create named pipe: {e}") # Open the FIFO for reading and writing (blocks until client connects) self.pipe_handle = os.open(self.pipe_name, os.O_RDWR) def __del__(self): """Cleanup resources when object is destroyed.""" self.close() def close(self): """Close the pipe handle.""" if self.pipe_handle is not None: if IS_WINDOWS: win32file.CloseHandle(self.pipe_handle) else: os.close(self.pipe_handle) # Remove the FIFO file if it is existing if os.path.exists(self.pipe_name): os.unlink(self.pipe_name) self.pipe_handle = None def read(self, size: int) -> bytes: """ Read data from the pipe. This is a blocking operation. :param size: Number of bytes to read :return: Bytes read from the pipe """ if size <= 0: return b"" if self.pipe_handle is None: raise RuntimeError("Pipe is not open") if IS_WINDOWS: try: # The result is a tuple of (hr, string/PyOVERLAPPEDReadBuffer). # You can convert this to a string (str(object)) [py2k] or (bytes(object)) [py3k] to obtain the data. # I can't visit PyOVERLAPPEDReadBuffer so I simply set it to Any data: Any (hr, data) = win32file.ReadFile(self.pipe_handle, size) if hr != 0: # Error occurred raise RuntimeError( f"Failed to read from pipe, error code: {hr}") data = bytes(data) if len(data) != size: raise RuntimeError( f"Incomplete read from pipe: expected {size} bytes, got {len(data)} bytes" ) return data except pywintypes.error as e: raise RuntimeError(f"Failed to read from named pipe: {e}") else: # POSIX implementation data = b"" while len(data) < size: try: chunk = os.read(self.pipe_handle, size - len(data)) if not chunk: raise RuntimeError("Named pipe closed during read") data += chunk except OSError as e: raise RuntimeError(f"Failed to read from named pipe: {e}") if len(data) != size: raise RuntimeError( f"Incomplete read from pipe: expected {size} bytes, got {len(data)} bytes" ) return data def write(self, data: bytes) -> None: """ Write data to the pipe. This is a blocking operation. :param data: Data to write to the pipe """ if len(data) == 0: return if self.pipe_handle is None: raise RuntimeError("Pipe is not open") if IS_WINDOWS: try: win32file.WriteFile(self.pipe_handle, data) except pywintypes.error as e: raise RuntimeError(f"Failed to write to named pipe: {e}") else: # POSIX implementation total_written = 0 while total_written < len(data): try: bytes_written = os.write(self.pipe_handle, data[total_written:]) total_written += bytes_written except OSError as e: raise RuntimeError(f"Failed to write to named pipe: {e}") # class PipeServer: # """ # A convenience class for creating a pipe server that can handle connections in a separate thread. # """ # def __init__(self, name: str): # self._name = name # self._pipe_operator: Optional[PipeOperator] = None # self._server_thread: Optional[threading.Thread] = None # self._stop_event = threading.Event() # def start_server(self, handler_func): # """ # Start the pipe server in a separate thread. # :param handler_func: Function to handle the connected pipe (takes PipeOperator as parameter) # """ # self._server_thread = threading.Thread(target=self._server_worker, # args=(handler_func, )) # self._server_thread.start() # def _server_worker(self, handler_func): # """Internal method to handle server operations.""" # try: # # Create pipe server # pipe_op = PipeOperator(self._name, is_server=True) # self._pipe_operator = pipe_op # # Call the handler function with the connected pipe # handler_func(pipe_op) # except Exception as e: # print(f"Error in pipe server: {e}") # finally: # if self._pipe_operator: # self._pipe_operator.close() # def stop_server(self): # """Stop the pipe server.""" # self._stop_event.set() # if self._server_thread: # self._server_thread.join() # def get_pipe(self) -> Optional[PipeOperator]: # """Get the connected pipe operator (only valid after client connects).""" # return self._pipe_operator