Skip to content

Commit

Permalink
refactor(api): location sequences for offsets
Browse files Browse the repository at this point in the history
When we load and express labware offsets, we previously used an object
that had explicit named members for a labware parent; a module parent;
and a deck slot. This worked as long as we don't have heterogenous
labware stacking (to include labware-on-labware-on-adapter) but it's
inflexible, and maybe we do want that.

Instead, let's have a sequence of little models that specify a parent
geometry - a labware def uri, a module model, or an addressable area
name - and use that both for specifying offset locations via the HTTP
API, for comparing locations internally, and for downloading locations
from the HTTP API.

Importantly, these offset locations are currently in the database as
parts of runs, so we have to keep the old version around when expressing
labware data; this is annoying, but that's life. We can calculate the
equivalent old data when we load a labware offset pretty easily, and
this also lets us not change the legacy protocol core.

We also are going to keep the old method of specifying them around, and
convert them into the new format, which is also pretty easy, to preserve
backwards compatibility and roundtripping older offsets from before this
data was present.
  • Loading branch information
sfoster1 committed Jan 25, 2025
1 parent c1305d3 commit a34b3ba
Show file tree
Hide file tree
Showing 31 changed files with 1,747 additions and 290 deletions.
2 changes: 1 addition & 1 deletion api/src/opentrons/protocol_api/core/engine/labware.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def set_calibration(self, delta: Point) -> None:

request = LabwareOffsetCreate.model_construct(
definitionUri=self.get_uri(),
location=offset_location,
locationSequence=offset_location,
vector=LabwareOffsetVector(x=delta.x, y=delta.y, z=delta.z),
)
self._engine_client.add_labware_offset(request)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def find(
See the parent class for param details.
"""
offset = self._labware_view.find_applicable_labware_offset(
offset = self._labware_view.find_applicable_labware_offset_by_legacy_location(
definition_uri=load_params.as_uri(),
location=LegacyLabwareOffsetLocation(
slotName=deck_slot,
Expand Down
4 changes: 2 additions & 2 deletions api/src/opentrons/protocol_engine/actions/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from ..notes.notes import CommandNote
from ..state.update_types import StateUpdate
from ..types import (
LabwareOffsetCreate,
LabwareOffsetCreateInternal,
ModuleDefinition,
Liquid,
DeckConfigurationType,
Expand Down Expand Up @@ -206,7 +206,7 @@ class AddLabwareOffsetAction:

labware_offset_id: str
created_at: datetime
request: LabwareOffsetCreate
request: LabwareOffsetCreateInternal


@dataclasses.dataclass(frozen=True)
Expand Down
2 changes: 2 additions & 0 deletions api/src/opentrons/protocol_engine/errors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
InvalidLiquidError,
LiquidClassDoesNotExistError,
LiquidClassRedefinitionError,
OffsetLocationInvalidError,
)

from .error_occurrence import ErrorOccurrence, ProtocolCommandFailedError
Expand Down Expand Up @@ -160,6 +161,7 @@
"LocationIsLidDockSlotError",
"InvalidAxisForRobotType",
"NotSupportedOnRobotType",
"OffsetLocationInvalidError",
# error occurrence models
"ErrorOccurrence",
"CommandNotAllowedError",
Expand Down
13 changes: 13 additions & 0 deletions api/src/opentrons/protocol_engine/errors/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,19 @@ def __init__(
super().__init__(ErrorCodes.GENERAL_ERROR, message, details, wrapping)


class OffsetLocationInvalidError(ProtocolEngineError):
"""Raised when encountering an invalid labware offset location sequence."""

def __init__(
self,
message: Optional[str] = None,
details: Optional[Dict[str, Any]] = None,
wrapping: Optional[Sequence[EnumeratedError]] = None,
) -> None:
"""Build an OffsetLocationSequenceDoesNotTerminateAtAnAddressableAreaError."""
super().__init__(ErrorCodes.GENERAL_ERROR, message, details, wrapping)


class SlotDoesNotExistError(ProtocolEngineError):
"""Raised when referencing a deck slot that does not exist."""

Expand Down
75 changes: 3 additions & 72 deletions api/src/opentrons/protocol_engine/execution/equipment.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Equipment command side-effect logic."""

from dataclasses import dataclass
from typing import Optional, overload, Union, List

Expand Down Expand Up @@ -42,10 +43,7 @@
from ..types import (
LabwareLocation,
DeckSlotLocation,
ModuleLocation,
OnLabwareLocation,
LabwareOffset,
LegacyLabwareOffsetLocation,
ModuleModel,
ModuleDefinition,
AddressableAreaLocation,
Expand Down Expand Up @@ -633,8 +631,9 @@ def find_applicable_labware_offset_id(
or None if no labware offset will apply.
"""
labware_offset_location = (
self._get_labware_offset_location_from_labware_location(labware_location)
self._state_store.geometry.get_projected_offset_location(labware_location)
)

if labware_offset_location is None:
# No offset for off-deck location.
# Returning None instead of raising an exception allows loading a labware
Expand All @@ -647,74 +646,6 @@ def find_applicable_labware_offset_id(
)
return self._get_id_from_offset(offset)

def _get_labware_offset_location_from_labware_location(
self, labware_location: LabwareLocation
) -> Optional[LegacyLabwareOffsetLocation]:
if isinstance(labware_location, DeckSlotLocation):
return LegacyLabwareOffsetLocation(slotName=labware_location.slotName)
elif isinstance(labware_location, ModuleLocation):
module_id = labware_location.moduleId
# Allow ModuleNotLoadedError to propagate.
# Note also that we match based on the module's requested model, not its
# actual model, to implement robot-server's documented HTTP API semantics.
module_model = self._state_store.modules.get_requested_model(
module_id=module_id
)

# If `module_model is None`, it probably means that this module was added by
# `ProtocolEngine.use_attached_modules()`, instead of an explicit
# `loadModule` command.
#
# This assert should never raise in practice because:
# 1. `ProtocolEngine.use_attached_modules()` is only used by
# robot-server's "stateless command" endpoints, under `/commands`.
# 2. Those endpoints don't support loading labware, so this code will
# never run.
#
# Nevertheless, if it does happen somehow, we do NOT want to pass the
# `None` value along to `LabwareView.find_applicable_labware_offset()`.
# `None` means something different there, which will cause us to return
# wrong results.
assert module_model is not None, (
"Can't find offsets for labware"
" that are loaded on modules"
" that were loaded with ProtocolEngine.use_attached_modules()."
)

module_location = self._state_store.modules.get_location(
module_id=module_id
)
slot_name = module_location.slotName
return LegacyLabwareOffsetLocation(
slotName=slot_name, moduleModel=module_model
)
elif isinstance(labware_location, OnLabwareLocation):
parent_labware_id = labware_location.labwareId
parent_labware_uri = self._state_store.labware.get_definition_uri(
parent_labware_id
)

base_location = self._state_store.labware.get_parent_location(
parent_labware_id
)
base_labware_offset_location = (
self._get_labware_offset_location_from_labware_location(base_location)
)
if base_labware_offset_location is None:
# No offset for labware sitting on labware off-deck
return None

# If labware is being stacked on itself, all labware in the stack will share a labware offset due to
# them sharing the same definitionUri in `LegacyLabwareOffsetLocation`. This will not be true for the
# bottom-most labware, which will have a `DeckSlotLocation` and have its definitionUri field empty.
return LegacyLabwareOffsetLocation(
slotName=base_labware_offset_location.slotName,
moduleModel=base_labware_offset_location.moduleModel,
definitionUri=parent_labware_uri,
)
else: # Off deck
return None

@staticmethod
def _get_id_from_offset(labware_offset: Optional[LabwareOffset]) -> Optional[str]:
return None if labware_offset is None else labware_offset.id
172 changes: 172 additions & 0 deletions api/src/opentrons/protocol_engine/labware_offset_standardization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
"""Convert labware offset creation requests and stored elements between legacy and new."""

from opentrons_shared_data.robot.types import RobotType
from opentrons_shared_data.deck.types import DeckDefinitionV5
from .errors import (
OffsetLocationInvalidError,
FixtureDoesNotExistError,
)
from .types import (
LabwareOffsetCreate,
LegacyLabwareOffsetCreate,
LabwareOffsetCreateInternal,
LegacyLabwareOffsetLocation,
LabwareOffsetLocationSequence,
OnLabwareOffsetLocationSequenceComponent,
OnAddressableAreaOffsetLocationSequenceComponent,
OnModuleOffsetLocationSequenceComponent,
ModuleModel,
)
from .resources import deck_configuration_provider


def standardize_labware_offset_create(
request: LabwareOffsetCreate | LegacyLabwareOffsetCreate,
robot_type: RobotType,
deck_definition: DeckDefinitionV5,
) -> LabwareOffsetCreateInternal:
"""Turn a union of old and new labware offset create requests into a new one."""
location_sequence, legacy_location = _locations_for_create(
request, robot_type, deck_definition
)
return LabwareOffsetCreateInternal(
definitionUri=request.definitionUri,
locationSequence=location_sequence,
legacyLocation=legacy_location,
vector=request.vector,
)


def _legacy_offset_location_to_offset_location_sequence(
location: LegacyLabwareOffsetLocation, deck_definition: DeckDefinitionV5
) -> LabwareOffsetLocationSequence:
sequence: LabwareOffsetLocationSequence = []
if location.definitionUri:
sequence.append(
OnLabwareOffsetLocationSequenceComponent(labwareUri=location.definitionUri)
)
if location.moduleModel:
sequence.append(
OnModuleOffsetLocationSequenceComponent(moduleModel=location.moduleModel)
)
cutout_id = deck_configuration_provider.get_cutout_id_by_deck_slot_name(
location.slotName
)
possible_cutout_fixture_id = location.moduleModel.value
try:
addressable_area = deck_configuration_provider.get_labware_hosting_addressable_area_name_for_cutout_and_cutout_fixture(
cutout_id, possible_cutout_fixture_id, deck_definition
)
sequence.append(
OnAddressableAreaOffsetLocationSequenceComponent(
addressableAreaName=addressable_area
)
)
except FixtureDoesNotExistError:
# this is an OT-2 (or this module isn't supported in the deck definition) and we should use a
# slot addressable area name
sequence.append(
OnAddressableAreaOffsetLocationSequenceComponent(
addressableAreaName=location.slotName.value
)
)

else:
# Slight hack: we should have a more formal association here. However, since the slot
# name is already standardized, and since the addressable areas for slots are just the
# name of the slots, we can rely on this.
sequence.append(
OnAddressableAreaOffsetLocationSequenceComponent(
addressableAreaName=location.slotName.value
)
)
return sequence


def _offset_location_sequence_head_to_labware_and_module(
location_sequence: LabwareOffsetLocationSequence, deck_definition: DeckDefinitionV5
) -> tuple[ModuleModel | None, str | None]:
labware_uri: str | None = None
module_model: ModuleModel | None = None
for location in location_sequence:
if isinstance(location, OnAddressableAreaOffsetLocationSequenceComponent):
raise OffsetLocationInvalidError(
"Addressable areas may only be the final element of an offset location."
)
elif isinstance(location, OnLabwareOffsetLocationSequenceComponent):
if labware_uri is not None:
# We only take the first location
continue
if module_model is not None:
# Labware can't be underneath modules
raise OffsetLocationInvalidError(
"Labware must not be underneath a module."
)
labware_uri = location.labwareUri
elif isinstance(location, OnModuleOffsetLocationSequenceComponent):
if module_model is not None:
# Bad, somebody put more than one module in here
raise OffsetLocationInvalidError(
"Only one module location may exist in an offset location."
)
module_model = location.moduleModel
else:
raise OffsetLocationInvalidError(
f"Invalid location component in offset location: {repr(location)}"
)
return module_model, labware_uri


def _offset_location_sequence_to_legacy_offset_location(
location_sequence: LabwareOffsetLocationSequence, deck_definition: DeckDefinitionV5
) -> LegacyLabwareOffsetLocation:
if len(location_sequence) == 0:
raise OffsetLocationInvalidError(
"Offset locations must contain at least one component."
)
last_element = location_sequence[-1]
if not isinstance(last_element, OnAddressableAreaOffsetLocationSequenceComponent):
raise OffsetLocationInvalidError(
"Offset locations must end with an addressable area."
)
module_model, labware_uri = _offset_location_sequence_head_to_labware_and_module(
location_sequence[:-1], deck_definition
)
(
cutout_id,
cutout_fixtures,
) = deck_configuration_provider.get_potential_cutout_fixtures(
last_element.addressableAreaName, deck_definition
)
slot_name = deck_configuration_provider.get_deck_slot_for_cutout_id(cutout_id)
return LegacyLabwareOffsetLocation(
slotName=slot_name, moduleModel=module_model, definitionUri=labware_uri
)


def _locations_for_create(
request: LabwareOffsetCreate | LegacyLabwareOffsetCreate,
robot_type: RobotType,
deck_definition: DeckDefinitionV5,
) -> tuple[LabwareOffsetLocationSequence, LegacyLabwareOffsetLocation]:
if isinstance(request, LabwareOffsetCreate):
return (
request.locationSequence,
_offset_location_sequence_to_legacy_offset_location(
request.locationSequence, deck_definition
),
)
else:
normalized = request.location.model_copy(
update={
"slotName": request.location.slotName.to_equivalent_for_robot_type(
robot_type
)
}
)
return (
_legacy_offset_location_to_offset_location_sequence(
normalized, deck_definition
),
normalized,
)
Loading

0 comments on commit a34b3ba

Please sign in to comment.