2026-06-15 15:58:04 +08:00
|
|
|
import heapq
|
|
|
|
|
from itertools import chain, product
|
|
|
|
|
from typing import Iterable, Iterator
|
|
|
|
|
from functools import cached_property
|
2026-06-15 17:13:15 +08:00
|
|
|
from .common import Resolver
|
2026-06-15 15:58:04 +08:00
|
|
|
from ..dataset import DatasetCollection, Dataset
|
2026-06-15 17:13:15 +08:00
|
|
|
from ..common import Circuit, DeviceKind, JointKind
|
|
|
|
|
from ..query import Request, Response
|
2026-06-02 21:08:48 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
class LutItem:
|
|
|
|
|
"""
|
|
|
|
|
An item in the lookup table.
|
|
|
|
|
"""
|
|
|
|
|
|
2026-06-15 15:58:04 +08:00
|
|
|
__circuit: Circuit
|
2026-06-02 21:08:48 +08:00
|
|
|
"""The circuit represented by this item."""
|
2026-06-15 15:58:04 +08:00
|
|
|
__device_kind: DeviceKind
|
|
|
|
|
"""The device kind applied for this circuit."""
|
2026-06-02 21:08:48 +08:00
|
|
|
|
2026-06-15 15:58:04 +08:00
|
|
|
def __init__(self, circuit: Circuit, device_kind: DeviceKind):
|
|
|
|
|
self.__circuit = circuit
|
|
|
|
|
self.__device_kind = device_kind
|
2026-06-02 21:08:48 +08:00
|
|
|
|
2026-06-15 15:58:04 +08:00
|
|
|
@property
|
|
|
|
|
def circuit(self) -> Circuit:
|
|
|
|
|
return self.__circuit
|
2026-06-02 21:08:48 +08:00
|
|
|
|
2026-06-15 15:58:04 +08:00
|
|
|
@cached_property
|
|
|
|
|
def value(self) -> float:
|
|
|
|
|
"""
|
|
|
|
|
The computed value of the circuit.
|
2026-06-02 21:08:48 +08:00
|
|
|
|
2026-06-15 15:58:04 +08:00
|
|
|
:return: The computed value.
|
|
|
|
|
"""
|
|
|
|
|
return self.__circuit.compute(self.__device_kind)
|
2026-06-02 21:08:48 +08:00
|
|
|
|
|
|
|
|
|
2026-06-15 15:58:04 +08:00
|
|
|
class ResultBucket(Iterable[LutItem]):
|
|
|
|
|
"""
|
|
|
|
|
A bounded bucket that keeps up to `N` LutItem entries with the smallest floats.
|
2026-06-02 21:08:48 +08:00
|
|
|
|
2026-06-15 15:58:04 +08:00
|
|
|
When the bucket is full, inserting a new item only succeeds if its float
|
|
|
|
|
is less than the current maximum; the maximum is then evicted.
|
|
|
|
|
"""
|
2026-06-02 21:08:48 +08:00
|
|
|
|
2026-06-15 15:58:04 +08:00
|
|
|
class ResultBucketItem:
|
|
|
|
|
"""
|
|
|
|
|
An item stored in a :class:`ResultBucket`.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
__score: float
|
|
|
|
|
"""The score associated with this item."""
|
|
|
|
|
__item: LutItem
|
|
|
|
|
"""The underlying LutItem."""
|
|
|
|
|
__seq: int
|
|
|
|
|
"""
|
|
|
|
|
Monotonic counter used as a tiebreaker when scores are equal,
|
|
|
|
|
ensuring that heapq never compares :class:`LutItem` directly.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, score: float, item: LutItem, seq: int):
|
|
|
|
|
self.__score = score
|
|
|
|
|
self.__item = item
|
|
|
|
|
self.__seq = seq
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def score(self) -> float:
|
|
|
|
|
"""The score associated with this item."""
|
|
|
|
|
return self.__score
|
2026-06-15 17:13:15 +08:00
|
|
|
|
2026-06-15 15:58:04 +08:00
|
|
|
@property
|
|
|
|
|
def item(self) -> LutItem:
|
|
|
|
|
"""The underlying LutItem."""
|
|
|
|
|
return self.__item
|
|
|
|
|
|
2026-06-15 17:13:15 +08:00
|
|
|
def __lt__(self, other: "ResultBucket.ResultBucketItem") -> bool:
|
2026-06-15 15:58:04 +08:00
|
|
|
# heapq is a min-heap: it always pops the smallest element.
|
|
|
|
|
# We invert the comparison so that an item with a larger score
|
|
|
|
|
# is considered "smaller", effectively turning the min-heap
|
|
|
|
|
# into a max-heap (largest-score item at the top).
|
|
|
|
|
if self.__score != other.__score:
|
|
|
|
|
return self.__score > other.__score
|
|
|
|
|
# Counter tiebreaker: when scores are equal the later-inserted
|
|
|
|
|
# item (higher seq) is considered "smaller" and gets evicted first.
|
|
|
|
|
return self.__seq > other.__seq
|
|
|
|
|
|
|
|
|
|
__n: int
|
|
|
|
|
"""Maximum number of items the bucket can hold."""
|
|
|
|
|
__heap: list[ResultBucketItem]
|
|
|
|
|
"""
|
|
|
|
|
Min-heap of :class:`ResultBucketItem`. The heap invariant is inverted
|
|
|
|
|
via :meth:`ResultBucketItem.__lt__` so the entry with the largest score
|
|
|
|
|
sits at index 0.
|
|
|
|
|
"""
|
|
|
|
|
__counter: int
|
|
|
|
|
"""
|
|
|
|
|
Monotonic counter fed to each :class:`ResultBucketItem` as a tiebreaker,
|
|
|
|
|
preventing heapq from comparing :class:`LutItem` on score collisions.
|
|
|
|
|
"""
|
2026-06-02 21:08:48 +08:00
|
|
|
|
2026-06-15 15:58:04 +08:00
|
|
|
def __init__(self, n: int):
|
|
|
|
|
self.__n = n
|
|
|
|
|
self.__heap = []
|
|
|
|
|
self.__counter = 0
|
|
|
|
|
|
|
|
|
|
def __len__(self) -> int:
|
|
|
|
|
return len(self.__heap)
|
|
|
|
|
|
|
|
|
|
def __iter__(self) -> Iterator[LutItem]:
|
|
|
|
|
for entry in self.__heap:
|
|
|
|
|
yield entry.item
|
|
|
|
|
|
|
|
|
|
def insert(self, item: LutItem, score: float) -> bool:
|
|
|
|
|
"""
|
|
|
|
|
Insert a :class:`LutItem` with the given score.
|
|
|
|
|
|
|
|
|
|
If the bucket is not yet full the item is always inserted.
|
|
|
|
|
Otherwise the item is only inserted when *score* is smaller
|
|
|
|
|
than the largest score currently in the bucket; the entry
|
|
|
|
|
with the largest score is then evicted.
|
|
|
|
|
|
|
|
|
|
:param item: The LutItem to insert.
|
|
|
|
|
:param score: The score associated with the item.
|
|
|
|
|
:return: ``True`` if the item was inserted, ``False`` otherwise.
|
|
|
|
|
"""
|
|
|
|
|
entry = ResultBucket.ResultBucketItem(score, item, self.__counter)
|
|
|
|
|
if len(self.__heap) < self.__n:
|
|
|
|
|
heapq.heappush(self.__heap, entry)
|
|
|
|
|
self.__counter += 1
|
|
|
|
|
return True
|
|
|
|
|
if score >= self.__heap[0].score:
|
|
|
|
|
return False
|
|
|
|
|
heapq.heapreplace(self.__heap, entry)
|
|
|
|
|
self.__counter += 1
|
|
|
|
|
return True
|
2026-06-02 21:08:48 +08:00
|
|
|
|
|
|
|
|
|
2026-06-15 15:58:04 +08:00
|
|
|
class LutResolver(Resolver):
|
|
|
|
|
"""
|
|
|
|
|
A resolver that uses a lookup table to find the best matching circuit.
|
|
|
|
|
"""
|
2026-06-02 21:08:48 +08:00
|
|
|
|
2026-06-15 15:58:04 +08:00
|
|
|
__resistor_lut: list[LutItem]
|
|
|
|
|
"""The lookup table for resistors."""
|
|
|
|
|
__capacitor_lut: list[LutItem]
|
|
|
|
|
"""The lookup table for capacitors."""
|
|
|
|
|
__inductor_lut: list[LutItem]
|
|
|
|
|
"""The lookup table for inductors."""
|
|
|
|
|
|
|
|
|
|
def __init__(self, datasets: DatasetCollection):
|
|
|
|
|
self.__resistor_lut = LutResolver.__build_lut(
|
|
|
|
|
datasets.resistor_values, DeviceKind.RESISTOR
|
|
|
|
|
)
|
|
|
|
|
self.__capacitor_lut = LutResolver.__build_lut(
|
|
|
|
|
datasets.capacitor_values, DeviceKind.CAPACITOR
|
|
|
|
|
)
|
|
|
|
|
self.__inductor_lut = LutResolver.__build_lut(
|
|
|
|
|
datasets.inductor_values, DeviceKind.INDUCTOR
|
|
|
|
|
)
|
2026-06-02 21:08:48 +08:00
|
|
|
|
2026-06-15 15:58:04 +08:00
|
|
|
@staticmethod
|
|
|
|
|
def __build_lut(dataset: Dataset, device_kind: DeviceKind) -> list[LutItem]:
|
|
|
|
|
values = dataset.values
|
|
|
|
|
joints = tuple(JointKind)
|
|
|
|
|
return [
|
|
|
|
|
LutItem(circuit, device_kind)
|
|
|
|
|
for circuit in chain(
|
|
|
|
|
(Circuit.from_one_device(v1) for v1 in values),
|
|
|
|
|
(
|
|
|
|
|
Circuit.from_two_devices(v1, v2, j2)
|
|
|
|
|
for v1, v2, j2 in product(values, values, joints)
|
|
|
|
|
),
|
|
|
|
|
(
|
|
|
|
|
Circuit.from_three_devices(v1, v2, j2, v3, j3)
|
|
|
|
|
for v1, v2, j2, v3, j3 in product(
|
|
|
|
|
values, values, joints, values, joints
|
|
|
|
|
)
|
|
|
|
|
),
|
|
|
|
|
)
|
|
|
|
|
]
|
|
|
|
|
|
2026-06-15 17:13:15 +08:00
|
|
|
def resolve(self, request: Request) -> Response:
|
2026-06-15 15:58:04 +08:00
|
|
|
# Fetch LUT by device kind
|
|
|
|
|
lut: list[LutItem]
|
|
|
|
|
match request.device_kind:
|
|
|
|
|
case DeviceKind.RESISTOR:
|
|
|
|
|
lut = self.__resistor_lut
|
|
|
|
|
case DeviceKind.CAPACITOR:
|
|
|
|
|
lut = self.__capacitor_lut
|
|
|
|
|
case DeviceKind.INDUCTOR:
|
|
|
|
|
lut = self.__inductor_lut
|
|
|
|
|
|
|
|
|
|
# Check LUT item one by one
|
|
|
|
|
bucket = ResultBucket(min(request.count_limit, 100))
|
|
|
|
|
for item in lut:
|
|
|
|
|
# compute absolute difference
|
|
|
|
|
difference = abs(request.target_value - item.value)
|
|
|
|
|
# If it is out of tolerance, skip it directly.
|
|
|
|
|
if difference > request.tolerance:
|
|
|
|
|
continue
|
|
|
|
|
# put it into bucket
|
|
|
|
|
bucket.insert(item, difference)
|
|
|
|
|
|
|
|
|
|
# Return result
|
2026-06-15 17:13:15 +08:00
|
|
|
return Response(request, map(lambda item: item.circuit, bucket))
|