refactor: refactoring old code
This commit is contained in:
0
legacy/resolver/__init__.py
Normal file
0
legacy/resolver/__init__.py
Normal file
15
legacy/resolver/astar.py
Normal file
15
legacy/resolver/astar.py
Normal file
@@ -0,0 +1,15 @@
|
||||
from typing import Iterator
|
||||
from .common import Resolver, ResolverRequest, ResolverResult, ResultPriority
|
||||
from ..dataset import DataSet
|
||||
|
||||
class AStarResolver(Resolver):
|
||||
"""
|
||||
A resolver that uses A* algorithm to find the best matching circuit.
|
||||
"""
|
||||
|
||||
def __init__(self, dataset: DataSet):
|
||||
pass
|
||||
|
||||
|
||||
def resolve(self, request: ResolverRequest) -> Iterator[ResolverResult]:
|
||||
pass
|
||||
102
legacy/resolver/common.py
Normal file
102
legacy/resolver/common.py
Normal file
@@ -0,0 +1,102 @@
|
||||
import enum
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from typing import Iterator
|
||||
from ..common import DeviceKind, Circuit
|
||||
|
||||
|
||||
class ResultPriority(enum.Enum):
|
||||
"""
|
||||
The priority of the result.
|
||||
"""
|
||||
|
||||
LESS_DEVICES = enum.auto()
|
||||
"""Less devices is the first priority."""
|
||||
MORE_ACCURACY = enum.auto()
|
||||
"""More accuracy is the first priority."""
|
||||
|
||||
|
||||
@dataclass
|
||||
class ResolverRequest:
|
||||
"""
|
||||
The request object for the resolver.
|
||||
"""
|
||||
|
||||
device_kind: DeviceKind
|
||||
"""The kind of device to resolve."""
|
||||
target_value: float
|
||||
"""The target value of the device."""
|
||||
tolerance: float
|
||||
"""The tolerance of the device."""
|
||||
result_priority: ResultPriority
|
||||
"""The priority of the result."""
|
||||
count_limit: int
|
||||
"""The limit of the count of results."""
|
||||
|
||||
|
||||
class ResolverResult:
|
||||
"""
|
||||
The result of the resolver.
|
||||
"""
|
||||
|
||||
circuit: Circuit
|
||||
"""The circuit of the result."""
|
||||
|
||||
__value_cache: float | None
|
||||
"""The cache of the circuit value."""
|
||||
__difference_cache: float | None
|
||||
"""The cache of the difference between the target value and the circuit value."""
|
||||
__relative_difference_cache: float | None
|
||||
"""The cache of the relative difference between the target value and the circuit value."""
|
||||
|
||||
def __init__(self, circuit: Circuit):
|
||||
self.circuit = circuit
|
||||
self.__value_cache = None
|
||||
self.__difference_cache = None
|
||||
self.__relative_difference_cache = None
|
||||
|
||||
def compute(self, device_kind: DeviceKind) -> float:
|
||||
"""
|
||||
Compute the circuit value.
|
||||
"""
|
||||
if self.__value_cache is None:
|
||||
self.__value_cache = self.circuit.compute(device_kind)
|
||||
return self.__value_cache
|
||||
|
||||
def difference(self, target_value: float, device_kind: DeviceKind) -> float:
|
||||
"""
|
||||
Get the difference between the target value and the circuit value.
|
||||
"""
|
||||
if self.__difference_cache is None:
|
||||
self.__difference_cache = abs(
|
||||
target_value - self.circuit.compute(device_kind)
|
||||
)
|
||||
return self.__difference_cache
|
||||
|
||||
def relative_difference(
|
||||
self, target_value: float, device_kind: DeviceKind
|
||||
) -> float:
|
||||
"""
|
||||
Get the relative difference between the target value and the circuit value.
|
||||
"""
|
||||
if self.__relative_difference_cache is None:
|
||||
self.__relative_difference_cache = (
|
||||
abs(target_value - self.circuit.compute(device_kind)) / target_value
|
||||
)
|
||||
return self.__relative_difference_cache
|
||||
|
||||
def len_devices(self) -> int:
|
||||
"""
|
||||
Get the number of devices in the circuit.
|
||||
"""
|
||||
return self.circuit.len_devices()
|
||||
|
||||
|
||||
class Resolver(ABC):
|
||||
"""
|
||||
Abstract base class for all resolvers.
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def resolve(self, request: ResolverRequest) -> Iterator[ResolverResult]:
|
||||
pass
|
||||
109
legacy/resolver/lut.py
Normal file
109
legacy/resolver/lut.py
Normal file
@@ -0,0 +1,109 @@
|
||||
import struct
|
||||
from typing import Iterator, BinaryIO
|
||||
from pathlib import Path
|
||||
from .common import Resolver, ResolverRequest, ResolverResult, ResultPriority
|
||||
from ..dataset import DataSet
|
||||
from ..common import Circuit, CircuitJoint, JointKind, LcrConnException
|
||||
|
||||
class LutResolver(Resolver):
|
||||
"""
|
||||
A resolver that uses a lookup table to find the best matching circuit.
|
||||
"""
|
||||
|
||||
lut: tuple[Circuit]
|
||||
|
||||
def __init__(self, lut: tuple[Circuit]):
|
||||
self.lut = lut
|
||||
|
||||
@staticmethod
|
||||
def from_dataset(dataset: DataSet) -> 'LutResolver':
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def from_cache(filename: Path) -> 'LutResolver':
|
||||
with open(filename, "rb") as f:
|
||||
cnt = _read_int(f)
|
||||
return LutResolver(tuple(LutItem.from_cache(f) for _ in range(cnt)))
|
||||
|
||||
def save_as_cache(self, filename: Path) -> None:
|
||||
with open(filename, "wb") as f:
|
||||
_write_int(f, len(self.lut))
|
||||
for item in self.lut:
|
||||
item.save_as_cache(f)
|
||||
|
||||
def resolve(self, request: ResolverRequest) -> Iterator[ResolverResult]:
|
||||
pass
|
||||
|
||||
|
||||
class LutItem:
|
||||
"""
|
||||
An item in the lookup table.
|
||||
"""
|
||||
|
||||
circuit: Circuit
|
||||
"""The circuit represented by this item."""
|
||||
__value_cache: float | None
|
||||
"""The cached computed value of the circuit, or None if it has not been cached yet."""
|
||||
|
||||
def __init__(self, circuit: Circuit):
|
||||
self.circuit = circuit
|
||||
|
||||
@staticmethod
|
||||
def from_cache(f: BinaryIO) -> 'LutItem':
|
||||
cnt = _read_int(f)
|
||||
|
||||
if cnt < 1:
|
||||
raise LcrConnException("Invalid circuit count in LUT item")
|
||||
device_value = _read_double(f)
|
||||
circuit = Circuit(device_value)
|
||||
cnt -= 1
|
||||
|
||||
for _ in range(cnt):
|
||||
j = JointKind.SERIES if _read_bool(f) else JointKind.PARALLEL
|
||||
dev = _read_double(f)
|
||||
joint = CircuitJoint(j, dev)
|
||||
circuit.add_joint(joint)
|
||||
|
||||
return LutItem(circuit)
|
||||
|
||||
def save_as_cache(self, f: BinaryIO) -> None:
|
||||
_write_int(f, self.circuit.len_devices())
|
||||
_write_double(f, self.circuit.device_value)
|
||||
for joint in self.circuit.joints():
|
||||
_write_bool(f, joint.kind == JointKind.SERIES)
|
||||
_write_double(f, joint.value)
|
||||
|
||||
def compute(self) -> float:
|
||||
"""The computed value of the circuit."""
|
||||
if self.__value_cache is None:
|
||||
self.__value_cache = self.circuit.value()
|
||||
return self.__value_cache
|
||||
|
||||
|
||||
|
||||
DOUBLE_PACKER = struct.Struct("d")
|
||||
INT_PACKER = struct.Struct("I")
|
||||
BOOL_PACKER = struct.Struct("?")
|
||||
|
||||
def _read_double(fs) -> float:
|
||||
return DOUBLE_PACKER.unpack(fs.read(DOUBLE_PACKER.size))[0]
|
||||
|
||||
def _read_int(fs) -> int:
|
||||
return INT_PACKER.unpack(fs.read(INT_PACKER.size))[0]
|
||||
|
||||
def _read_bool(fs) -> bool:
|
||||
return BOOL_PACKER.unpack(fs.read(BOOL_PACKER.size))[0]
|
||||
|
||||
def _write_double(fs, num: float):
|
||||
fs.write(DOUBLE_PACKER.pack(num))
|
||||
|
||||
def _write_int(fs, num: int):
|
||||
fs.write(INT_PACKER.pack(num))
|
||||
|
||||
def _write_bool(fs, num: bool):
|
||||
fs.write(BOOL_PACKER.pack(num))
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user