Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api, robot-server, shared-data): add FlexStacker module to the hardware controller and robot server. #17187

Merged
merged 11 commits into from
Jan 9, 2025
9 changes: 6 additions & 3 deletions api/src/opentrons/drivers/flex_stacker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from .abstract import AbstractStackerDriver
from .driver import FlexStackerDriver
from .abstract import AbstractFlexStackerDriver
from .driver import FlexStackerDriver, STACKER_MOTION_CONFIG
from .simulator import SimulatingDriver
from . import types as FlexStackerTypes

__all__ = [
"AbstractStackerDriver",
"AbstractFlexStackerDriver",
"FlexStackerDriver",
"SimulatingDriver",
"FlexStackerTypes",
"STACKER_MOTION_CONFIG",
]
40 changes: 31 additions & 9 deletions api/src/opentrons/drivers/flex_stacker/abstract.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Protocol
from typing import List, Protocol

from .types import (
LimitSwitchStatus,
MoveResult,
StackerAxis,
PlatformStatus,
Direction,
Expand All @@ -10,7 +12,7 @@
)


class AbstractStackerDriver(Protocol):
class AbstractFlexStackerDriver(Protocol):
"""Protocol for the Stacker driver."""

async def connect(self) -> None:
Expand All @@ -25,10 +27,6 @@ async def is_connected(self) -> bool:
"""Check connection to stacker."""
...

async def update_firmware(self, firmware_file_path: str) -> None:
"""Updates the firmware on the device."""
...

async def get_device_info(self) -> StackerInfo:
"""Get Device Info."""
...
Expand All @@ -37,17 +35,37 @@ async def set_serial_number(self, sn: str) -> bool:
"""Set Serial Number."""
...

async def enable_motors(self, axis: List[StackerAxis]) -> bool:
"""Enables the axis motor if present, disables it otherwise."""
...

async def stop_motors(self) -> bool:
"""Stop all motor movement."""
...

async def set_run_current(self, axis: StackerAxis, current: float) -> bool:
"""Set axis peak run current in amps."""
...

async def set_ihold_current(self, axis: StackerAxis, current: float) -> bool:
"""Set axis hold current in amps."""
...

async def get_motion_params(self, axis: StackerAxis) -> MoveParams:
"""Get the motion parameters used by the given axis motor."""
...

async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool:
"""Get limit switch status.
:return: True if limit switch is triggered, False otherwise
"""
...

async def get_limit_switches_status(self) -> LimitSwitchStatus:
"""Get limit switch statuses for all axes."""
...

async def get_platform_sensor(self, direction: Direction) -> bool:
"""Get platform sensor status.
Expand All @@ -68,13 +86,13 @@ async def get_hopper_door_closed(self) -> bool:

async def move_in_mm(
self, axis: StackerAxis, distance: float, params: MoveParams | None = None
) -> bool:
"""Move axis."""
) -> MoveResult:
"""Move axis by the given distance in mm."""
...

async def move_to_limit_switch(
self, axis: StackerAxis, direction: Direction, params: MoveParams | None = None
) -> bool:
) -> MoveResult:
"""Move until limit switch is triggered."""
...

Expand All @@ -87,3 +105,7 @@ async def set_led(
) -> bool:
"""Set LED color of status bar."""
...

async def enter_programming_mode(self) -> None:
"""Reboot into programming mode"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we call this prep_for_update() or something that's more like what other modules use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enter_programming_mode is the lower-level driver function called by the module's higher-level prep_for_update function, which adheres to the rest of the modules.

...
136 changes: 124 additions & 12 deletions api/src/opentrons/drivers/flex_stacker/driver.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import asyncio
import re
from typing import Optional
from typing import List, Optional

from opentrons.drivers.command_builder import CommandBuilder
from opentrons.drivers.asyncio.communication import AsyncResponseSerialConnection

from .abstract import AbstractStackerDriver
from .abstract import AbstractFlexStackerDriver
from .types import (
GCODE,
MoveResult,
StackerAxis,
PlatformStatus,
Direction,
Expand All @@ -28,7 +29,59 @@
GCODE_ROUNDING_PRECISION = 2


class FlexStackerDriver(AbstractStackerDriver):
STACKER_MOTION_CONFIG = {
StackerAxis.X: {
"home": MoveParams(
StackerAxis.X,
max_speed=10.0,
acceleration=100.0,
max_speed_discont=40,
current=1.5,
),
"move": MoveParams(
StackerAxis.X,
max_speed=200.0,
acceleration=1500.0,
max_speed_discont=40,
current=1.0,
),
},
StackerAxis.Z: {
"home": MoveParams(
StackerAxis.Z,
max_speed=10.0,
acceleration=100.0,
max_speed_discont=40,
current=1.5,
),
"move": MoveParams(
StackerAxis.Z,
max_speed=200.0,
acceleration=500.0,
max_speed_discont=40,
current=1.5,
),
},
StackerAxis.L: {
"home": MoveParams(
StackerAxis.L,
max_speed=100.0,
acceleration=800.0,
max_speed_discont=40,
current=0.8,
),
"move": MoveParams(
StackerAxis.L,
max_speed=100.0,
acceleration=800.0,
max_speed_discont=40,
current=0.6,
),
},
}


class FlexStackerDriver(AbstractFlexStackerDriver):
"""FLEX Stacker driver."""

@classmethod
Expand Down Expand Up @@ -76,6 +129,27 @@ def parse_door_closed(cls, response: str) -> bool:
raise ValueError(f"Incorrect Response for door closed: {response}")
return bool(int(match.group(1)))

@classmethod
def parse_move_params(cls, response: str) -> MoveParams:
"""Parse move params."""
field_names = MoveParams.get_fields()
pattern = r"\s".join(
[
rf"{f}:(?P<{f}>(\d*\.)?\d+)" if f != "M" else rf"{f}:(?P<{f}>[X,Z,L])"
for f in field_names
]
)
_RE = re.compile(f"^{GCODE.GET_MOVE_PARAMS} {pattern}$")
m = _RE.match(response)
if not m:
raise ValueError(f"Incorrect Response for move params: {response}")
return MoveParams(
axis=StackerAxis(m.group("M")),
max_speed=float(m.group("V")),
acceleration=float(m.group("A")),
max_speed_discont=float(m.group("D")),
)

@classmethod
def append_move_params(
cls, command: CommandBuilder, params: MoveParams | None
Expand Down Expand Up @@ -148,13 +222,48 @@ async def set_serial_number(self, sn: str) -> bool:
raise ValueError(f"Incorrect Response for set serial number: {resp}")
return True

async def enable_motors(self, axis: List[StackerAxis]) -> bool:
"""Enables the axis motor if present, disables it otherwise."""
command = GCODE.ENABLE_MOTORS.build_command()
for a in axis:
command.add_element(a.name)
resp = await self._connection.send_command(command)
if not re.match(rf"^{GCODE.ENABLE_MOTORS}$", resp):
raise ValueError(f"Incorrect Response for enable motors: {resp}")
return True

async def stop_motors(self) -> bool:
"""Stop all motor movement."""
resp = await self._connection.send_command(GCODE.STOP_MOTORS.build_command())
if not re.match(rf"^{GCODE.STOP_MOTORS}$", resp):
raise ValueError(f"Incorrect Response for stop motors: {resp}")
return True

async def set_run_current(self, axis: StackerAxis, current: float) -> bool:
"""Set axis peak run current in amps."""
resp = await self._connection.send_command(
GCODE.SET_RUN_CURRENT.build_command().add_float(axis.name, current)
)
if not re.match(rf"^{GCODE.SET_RUN_CURRENT}$", resp):
raise ValueError(f"Incorrect Response for set run current: {resp}")
return True

async def set_ihold_current(self, axis: StackerAxis, current: float) -> bool:
"""Set axis hold current in amps."""
resp = await self._connection.send_command(
GCODE.SET_IHOLD_CURRENT.build_command().add_float(axis.name, current)
)
if not re.match(rf"^{GCODE.SET_IHOLD_CURRENT}$", resp):
raise ValueError(f"Incorrect Response for set ihold current: {resp}")
return True

async def get_motion_params(self, axis: StackerAxis) -> MoveParams:
"""Get the motion parameters used by the given axis motor."""
response = await self._connection.send_command(
GCODE.GET_MOVE_PARAMS.build_command().add_element(axis.name)
)
return self.parse_move_params(response)

async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool:
"""Get limit switch status.

Expand Down Expand Up @@ -197,8 +306,8 @@ async def get_hopper_door_closed(self) -> bool:

async def move_in_mm(
self, axis: StackerAxis, distance: float, params: MoveParams | None = None
) -> bool:
"""Move axis."""
) -> MoveResult:
"""Move axis by the given distance in mm."""
command = self.append_move_params(
GCODE.MOVE_TO.build_command().add_float(
axis.name, distance, GCODE_ROUNDING_PRECISION
Expand All @@ -208,11 +317,12 @@ async def move_in_mm(
resp = await self._connection.send_command(command)
if not re.match(rf"^{GCODE.MOVE_TO}$", resp):
raise ValueError(f"Incorrect Response for move to: {resp}")
return True
# TODO: handle STALL_ERROR
return MoveResult.NO_ERROR

async def move_to_limit_switch(
self, axis: StackerAxis, direction: Direction, params: MoveParams | None = None
) -> bool:
) -> MoveResult:
"""Move until limit switch is triggered."""
command = self.append_move_params(
GCODE.MOVE_TO_SWITCH.build_command().add_int(axis.name, direction.value),
Expand All @@ -221,7 +331,8 @@ async def move_to_limit_switch(
resp = await self._connection.send_command(command)
if not re.match(rf"^{GCODE.MOVE_TO_SWITCH}$", resp):
raise ValueError(f"Incorrect Response for move to switch: {resp}")
return True
# TODO: handle STALL_ERROR
return MoveResult.NO_ERROR

async def home_axis(self, axis: StackerAxis, direction: Direction) -> bool:
"""Home axis."""
Expand Down Expand Up @@ -254,7 +365,8 @@ async def set_led(
raise ValueError(f"Incorrect Response for set led: {resp}")
return True

async def update_firmware(self, firmware_file_path: str) -> None:
"""Updates the firmware on the device."""
# TODO: Implement firmware update
pass
async def enter_programming_mode(self) -> None:
"""Reboot into programming mode"""
command = GCODE.ENTER_BOOTLOADER.build_command()
await self._connection.send_dfu_command(command)
await self._connection.close()
Loading
Loading