Skip to content

Commit

Permalink
feat(api, robot-server, shared-data): add FlexStacker module to the h…
Browse files Browse the repository at this point in the history
…ardware controller and robot server. (#17187)
  • Loading branch information
vegano1 authored Jan 9, 2025
1 parent d3b1b9c commit a3f541d
Show file tree
Hide file tree
Showing 19 changed files with 1,473 additions and 233 deletions.
9 changes: 6 additions & 3 deletions api/src/opentrons/drivers/flex_stacker/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
from .abstract import AbstractStackerDriver
from .driver import FlexStackerDriver
from .abstract import AbstractFlexStackerDriver
from .driver import FlexStackerDriver, STACKER_MOTION_CONFIG
from .simulator import SimulatingDriver
from . import types as FlexStackerTypes

__all__ = [
"AbstractStackerDriver",
"AbstractFlexStackerDriver",
"FlexStackerDriver",
"SimulatingDriver",
"FlexStackerTypes",
"STACKER_MOTION_CONFIG",
]
40 changes: 31 additions & 9 deletions api/src/opentrons/drivers/flex_stacker/abstract.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import Protocol
from typing import List, Protocol

from .types import (
LimitSwitchStatus,
MoveResult,
StackerAxis,
PlatformStatus,
Direction,
Expand All @@ -10,7 +12,7 @@
)


class AbstractStackerDriver(Protocol):
class AbstractFlexStackerDriver(Protocol):
"""Protocol for the Stacker driver."""

async def connect(self) -> None:
Expand All @@ -25,10 +27,6 @@ async def is_connected(self) -> bool:
"""Check connection to stacker."""
...

async def update_firmware(self, firmware_file_path: str) -> None:
"""Updates the firmware on the device."""
...

async def get_device_info(self) -> StackerInfo:
"""Get Device Info."""
...
Expand All @@ -37,17 +35,37 @@ async def set_serial_number(self, sn: str) -> bool:
"""Set Serial Number."""
...

async def enable_motors(self, axis: List[StackerAxis]) -> bool:
"""Enables the axis motor if present, disables it otherwise."""
...

async def stop_motors(self) -> bool:
"""Stop all motor movement."""
...

async def set_run_current(self, axis: StackerAxis, current: float) -> bool:
"""Set axis peak run current in amps."""
...

async def set_ihold_current(self, axis: StackerAxis, current: float) -> bool:
"""Set axis hold current in amps."""
...

async def get_motion_params(self, axis: StackerAxis) -> MoveParams:
"""Get the motion parameters used by the given axis motor."""
...

async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool:
"""Get limit switch status.
:return: True if limit switch is triggered, False otherwise
"""
...

async def get_limit_switches_status(self) -> LimitSwitchStatus:
"""Get limit switch statuses for all axes."""
...

async def get_platform_sensor(self, direction: Direction) -> bool:
"""Get platform sensor status.
Expand All @@ -68,13 +86,13 @@ async def get_hopper_door_closed(self) -> bool:

async def move_in_mm(
self, axis: StackerAxis, distance: float, params: MoveParams | None = None
) -> bool:
"""Move axis."""
) -> MoveResult:
"""Move axis by the given distance in mm."""
...

async def move_to_limit_switch(
self, axis: StackerAxis, direction: Direction, params: MoveParams | None = None
) -> bool:
) -> MoveResult:
"""Move until limit switch is triggered."""
...

Expand All @@ -87,3 +105,7 @@ async def set_led(
) -> bool:
"""Set LED color of status bar."""
...

async def enter_programming_mode(self) -> None:
"""Reboot into programming mode"""
...
136 changes: 124 additions & 12 deletions api/src/opentrons/drivers/flex_stacker/driver.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import asyncio
import re
from typing import Optional
from typing import List, Optional

from opentrons.drivers.command_builder import CommandBuilder
from opentrons.drivers.asyncio.communication import AsyncResponseSerialConnection

from .abstract import AbstractStackerDriver
from .abstract import AbstractFlexStackerDriver
from .types import (
GCODE,
MoveResult,
StackerAxis,
PlatformStatus,
Direction,
Expand All @@ -28,7 +29,59 @@
GCODE_ROUNDING_PRECISION = 2


class FlexStackerDriver(AbstractStackerDriver):
STACKER_MOTION_CONFIG = {
StackerAxis.X: {
"home": MoveParams(
StackerAxis.X,
max_speed=10.0,
acceleration=100.0,
max_speed_discont=40,
current=1.5,
),
"move": MoveParams(
StackerAxis.X,
max_speed=200.0,
acceleration=1500.0,
max_speed_discont=40,
current=1.0,
),
},
StackerAxis.Z: {
"home": MoveParams(
StackerAxis.Z,
max_speed=10.0,
acceleration=100.0,
max_speed_discont=40,
current=1.5,
),
"move": MoveParams(
StackerAxis.Z,
max_speed=200.0,
acceleration=500.0,
max_speed_discont=40,
current=1.5,
),
},
StackerAxis.L: {
"home": MoveParams(
StackerAxis.L,
max_speed=100.0,
acceleration=800.0,
max_speed_discont=40,
current=0.8,
),
"move": MoveParams(
StackerAxis.L,
max_speed=100.0,
acceleration=800.0,
max_speed_discont=40,
current=0.6,
),
},
}


class FlexStackerDriver(AbstractFlexStackerDriver):
"""FLEX Stacker driver."""

@classmethod
Expand Down Expand Up @@ -76,6 +129,27 @@ def parse_door_closed(cls, response: str) -> bool:
raise ValueError(f"Incorrect Response for door closed: {response}")
return bool(int(match.group(1)))

@classmethod
def parse_move_params(cls, response: str) -> MoveParams:
"""Parse move params."""
field_names = MoveParams.get_fields()
pattern = r"\s".join(
[
rf"{f}:(?P<{f}>(\d*\.)?\d+)" if f != "M" else rf"{f}:(?P<{f}>[X,Z,L])"
for f in field_names
]
)
_RE = re.compile(f"^{GCODE.GET_MOVE_PARAMS} {pattern}$")
m = _RE.match(response)
if not m:
raise ValueError(f"Incorrect Response for move params: {response}")
return MoveParams(
axis=StackerAxis(m.group("M")),
max_speed=float(m.group("V")),
acceleration=float(m.group("A")),
max_speed_discont=float(m.group("D")),
)

@classmethod
def append_move_params(
cls, command: CommandBuilder, params: MoveParams | None
Expand Down Expand Up @@ -148,13 +222,48 @@ async def set_serial_number(self, sn: str) -> bool:
raise ValueError(f"Incorrect Response for set serial number: {resp}")
return True

async def enable_motors(self, axis: List[StackerAxis]) -> bool:
"""Enables the axis motor if present, disables it otherwise."""
command = GCODE.ENABLE_MOTORS.build_command()
for a in axis:
command.add_element(a.name)
resp = await self._connection.send_command(command)
if not re.match(rf"^{GCODE.ENABLE_MOTORS}$", resp):
raise ValueError(f"Incorrect Response for enable motors: {resp}")
return True

async def stop_motors(self) -> bool:
"""Stop all motor movement."""
resp = await self._connection.send_command(GCODE.STOP_MOTORS.build_command())
if not re.match(rf"^{GCODE.STOP_MOTORS}$", resp):
raise ValueError(f"Incorrect Response for stop motors: {resp}")
return True

async def set_run_current(self, axis: StackerAxis, current: float) -> bool:
"""Set axis peak run current in amps."""
resp = await self._connection.send_command(
GCODE.SET_RUN_CURRENT.build_command().add_float(axis.name, current)
)
if not re.match(rf"^{GCODE.SET_RUN_CURRENT}$", resp):
raise ValueError(f"Incorrect Response for set run current: {resp}")
return True

async def set_ihold_current(self, axis: StackerAxis, current: float) -> bool:
"""Set axis hold current in amps."""
resp = await self._connection.send_command(
GCODE.SET_IHOLD_CURRENT.build_command().add_float(axis.name, current)
)
if not re.match(rf"^{GCODE.SET_IHOLD_CURRENT}$", resp):
raise ValueError(f"Incorrect Response for set ihold current: {resp}")
return True

async def get_motion_params(self, axis: StackerAxis) -> MoveParams:
"""Get the motion parameters used by the given axis motor."""
response = await self._connection.send_command(
GCODE.GET_MOVE_PARAMS.build_command().add_element(axis.name)
)
return self.parse_move_params(response)

async def get_limit_switch(self, axis: StackerAxis, direction: Direction) -> bool:
"""Get limit switch status.
Expand Down Expand Up @@ -197,8 +306,8 @@ async def get_hopper_door_closed(self) -> bool:

async def move_in_mm(
self, axis: StackerAxis, distance: float, params: MoveParams | None = None
) -> bool:
"""Move axis."""
) -> MoveResult:
"""Move axis by the given distance in mm."""
command = self.append_move_params(
GCODE.MOVE_TO.build_command().add_float(
axis.name, distance, GCODE_ROUNDING_PRECISION
Expand All @@ -208,11 +317,12 @@ async def move_in_mm(
resp = await self._connection.send_command(command)
if not re.match(rf"^{GCODE.MOVE_TO}$", resp):
raise ValueError(f"Incorrect Response for move to: {resp}")
return True
# TODO: handle STALL_ERROR
return MoveResult.NO_ERROR

async def move_to_limit_switch(
self, axis: StackerAxis, direction: Direction, params: MoveParams | None = None
) -> bool:
) -> MoveResult:
"""Move until limit switch is triggered."""
command = self.append_move_params(
GCODE.MOVE_TO_SWITCH.build_command().add_int(axis.name, direction.value),
Expand All @@ -221,7 +331,8 @@ async def move_to_limit_switch(
resp = await self._connection.send_command(command)
if not re.match(rf"^{GCODE.MOVE_TO_SWITCH}$", resp):
raise ValueError(f"Incorrect Response for move to switch: {resp}")
return True
# TODO: handle STALL_ERROR
return MoveResult.NO_ERROR

async def home_axis(self, axis: StackerAxis, direction: Direction) -> bool:
"""Home axis."""
Expand Down Expand Up @@ -254,7 +365,8 @@ async def set_led(
raise ValueError(f"Incorrect Response for set led: {resp}")
return True

async def update_firmware(self, firmware_file_path: str) -> None:
"""Updates the firmware on the device."""
# TODO: Implement firmware update
pass
async def enter_programming_mode(self) -> None:
"""Reboot into programming mode"""
command = GCODE.ENTER_BOOTLOADER.build_command()
await self._connection.send_dfu_command(command)
await self._connection.close()
Loading

0 comments on commit a3f541d

Please sign in to comment.