Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api, robot-server, shared-data): add FlexStacker module to the hardware controller and robot server. #17187

Merged
merged 11 commits into from
Jan 9, 2025
4 changes: 2 additions & 2 deletions api/src/opentrons/drivers/flex_stacker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from .abstract import AbstractStackerDriver
from .abstract import AbstractFlexStackerDriver
from .driver import FlexStackerDriver
from .simulator import SimulatingDriver

__all__ = [
"AbstractStackerDriver",
"AbstractFlexStackerDriver",
"FlexStackerDriver",
"SimulatingDriver",
]
15 changes: 10 additions & 5 deletions api/src/opentrons/drivers/flex_stacker/abstract.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Protocol

from .types import (
LimitSwitchStatus,
StackerAxis,
PlatformStatus,
Direction,
Expand All @@ -10,7 +11,7 @@
)


class AbstractStackerDriver(Protocol):
class AbstractFlexStackerDriver(Protocol):
"""Protocol for the Stacker driver."""

async def connect(self) -> None:
Expand All @@ -25,10 +26,6 @@ async def is_connected(self) -> bool:
"""Check connection to stacker."""
...

async def update_firmware(self, firmware_file_path: str) -> None:
"""Updates the firmware on the device."""
...

async def get_device_info(self) -> StackerInfo:
"""Get Device Info."""
...
Expand All @@ -48,6 +45,10 @@ async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> boo
"""
...

async def get_limit_switches_status(self) -> LimitSwitchStatus:
"""Get limit switch statuses for all axes."""
...

async def get_platform_sensor(self, direction: Direction) -> bool:
"""Get platform sensor status.

Expand Down Expand Up @@ -87,3 +88,7 @@ async def set_led(
) -> bool:
"""Set LED color of status bar."""
...

async def enter_programming_mode(self) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we call this prep_for_update() or something that's more like what other modules use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enter_programming_mode is the lower-level driver function called by the module's higher-level prep_for_update function, which adheres to the rest of the modules.

"""Reboot into programming mode"""
...
13 changes: 7 additions & 6 deletions api/src/opentrons/drivers/flex_stacker/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from opentrons.drivers.command_builder import CommandBuilder
from opentrons.drivers.asyncio.communication import AsyncResponseSerialConnection

from .abstract import AbstractStackerDriver
from .abstract import AbstractFlexStackerDriver
from .types import (
GCODE,
StackerAxis,
Expand All @@ -28,7 +28,7 @@
GCODE_ROUNDING_PRECISION = 2


class FlexStackerDriver(AbstractStackerDriver):
class FlexStackerDriver(AbstractFlexStackerDriver):
"""FLEX Stacker driver."""

@classmethod
Expand Down Expand Up @@ -254,7 +254,8 @@ async def set_led(
raise ValueError(f"Incorrect Response for set led: {resp}")
return True

async def update_firmware(self, firmware_file_path: str) -> None:
"""Updates the firmware on the device."""
# TODO: Implement firmware update
pass
async def enter_programming_mode(self) -> None:
"""Reboot into programming mode"""
command = GCODE.ENTER_BOOTLOADER.build_command()
await self._connection.send_dfu_command(command)
await self._connection.close()
32 changes: 25 additions & 7 deletions api/src/opentrons/drivers/flex_stacker/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

from opentrons.util.async_helpers import ensure_yield

from .abstract import AbstractStackerDriver
from .abstract import AbstractFlexStackerDriver
from .types import (
LEDColor,
StackerAxis,
PlatformStatus,
Direction,
Expand All @@ -14,7 +15,7 @@
)


class SimulatingDriver(AbstractStackerDriver):
class SimulatingDriver(AbstractFlexStackerDriver):
"""FLEX Stacker driver simulator."""

def __init__(self, serial_number: Optional[str] = None) -> None:
Expand Down Expand Up @@ -61,8 +62,8 @@ async def set_serial_number(self, sn: str) -> bool:
return True

@ensure_yield
async def stop_motor(self) -> bool:
"""Stop motor movement."""
async def stop_motors(self) -> bool:
"""Stop all motor movement."""
return True

@ensure_yield
Expand All @@ -78,12 +79,15 @@ async def get_limit_switches_status(self) -> LimitSwitchStatus:
"""Get limit switch statuses for all axes."""
return self._limit_switch_status

@ensure_yield
async def get_platform_sensor_status(self) -> PlatformStatus:
async def get_platform_sensor(self, direction: Direction) -> bool:
"""Get platform sensor status.

:return: True if platform is detected, False otherwise
:return: True if platform is present, False otherwise
"""
return True

async def get_platform_status(self) -> PlatformStatus:
"""Get platform status."""
return self._platform_sensor_status

@ensure_yield
Expand All @@ -107,3 +111,17 @@ async def move_to_limit_switch(
) -> bool:
"""Move until limit switch is triggered."""
return True

async def home_axis(self, axis: StackerAxis, direction: Direction) -> bool:
"""Home axis."""
return True

async def set_led(
self, power: float, color: LEDColor | None = None, external: bool | None = None
) -> bool:
"""Set LED color."""
return True

async def enter_programming_mode(self) -> None:
"""Reboot into programming mode"""
pass
76 changes: 68 additions & 8 deletions api/src/opentrons/drivers/flex_stacker/types.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from enum import Enum
from dataclasses import dataclass, fields
from typing import List
from typing import List, Dict

from opentrons.drivers.command_builder import CommandBuilder

Expand Down Expand Up @@ -45,6 +45,14 @@ class StackerInfo:
hw: HardwareRevision
sn: str

def to_dict(self) -> Dict[str, str]:
"""Build command."""
return {
"serial": self.sn,
"version": self.fw,
"model": self.hw.value,
}


class StackerAxis(Enum):
"""Stacker Axis."""
Expand Down Expand Up @@ -90,11 +98,11 @@ def distance(self, distance: float) -> float:
class LimitSwitchStatus:
"""Stacker Limit Switch Statuses."""

XE: bool
XR: bool
ZE: bool
ZR: bool
LR: bool
XE: bool = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The GCode that we use to get the limit switch statuses always return results for all axes. What are the reasoning behind having setting default values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes it easier to build the obj for testing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the data constraints should match real life, IMO, and not have defaults. if it's annoying to build for tests, we can make a helper function in the test file that does it.

XR: bool = False
ZE: bool = False
ZR: bool = False
LR: bool = False

@classmethod
def get_fields(cls) -> List[str]:
Expand All @@ -116,8 +124,8 @@ def get(self, axis: StackerAxis, direction: Direction) -> bool:
class PlatformStatus:
"""Stacker Platform Statuses."""

E: bool
R: bool
E: bool = False
R: bool = False

@classmethod
def get_fields(cls) -> List[str]:
Expand All @@ -128,6 +136,58 @@ def get(self, direction: Direction) -> bool:
"""Get platform status."""
return self.E if direction == Direction.EXTENT else self.R

def to_dict(self) -> Dict[str, bool]:
"""Dict of the data."""
return {
"extent": self.E,
"retract": self.R,
}


class PlatformState(Enum):
UNKNOWN = "unknown"
EXTENDED = "extended"
RETRACTED = "retracted"

@classmethod
def from_status(cls, status: PlatformStatus) -> "PlatformState":
"""Get the state from the platform status."""
if status.E and not status.R:
return PlatformState.EXTENDED
if status.R and not status.E:
return PlatformState.RETRACTED
return PlatformState.UNKNOWN


class StackerAxisState(Enum):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice - I was actually planning on this exact logic of this in FW. So we can send a GCode querying the axis state, and it will return one of these strings.

UNKNOWN = "unknown"
EXTENDED = "extended"
RETRACTED = "retracted"

@classmethod
def from_status(
cls, status: LimitSwitchStatus, axis: StackerAxis
) -> "StackerAxisState":
"""Get the axis state from the limit switch status."""
match axis:
case StackerAxis.X:
if status.XE and not status.XR:
return StackerAxisState.EXTENDED
if status.XR and not status.XE:
return StackerAxisState.RETRACTED
case StackerAxis.Z:
if status.ZE and not status.ZR:
return StackerAxisState.EXTENDED
if status.ZR and not status.ZE:
return StackerAxisState.RETRACTED
case StackerAxis.L:
return (
StackerAxisState.EXTENDED
if status.LR
else StackerAxisState.RETRACTED
)
return StackerAxisState.UNKNOWN


@dataclass
class MoveParams:
Expand Down
2 changes: 2 additions & 0 deletions api/src/opentrons/hardware_control/modules/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .thermocycler import Thermocycler
from .heater_shaker import HeaterShaker
from .absorbance_reader import AbsorbanceReader
from .flex_stacker import FlexStacker
from .update import update_firmware
from .utils import MODULE_TYPE_BY_NAME, build
from .types import (
Expand Down Expand Up @@ -55,4 +56,5 @@
"AbsorbanceReaderStatus",
"AbsorbanceReaderDisconnectedError",
"ModuleDisconnectedCallback",
"FlexStacker",
]
Loading
Loading