Skip to content

ump_binding

Bindings for Sensapex uMp platform.

Usage: Instantiate UmpBindings to interact with Sensapex uMp-4 and uMp-3 manipulators.

UmpBinding

Bases: BaseBinding

Bindings for uMp platform

Source code in src/ephys_link/bindings/ump_binding.py
@final
class UmpBinding(BaseBinding):
    """Bindings for uMp platform"""

    # Number of axes for uMp-3.
    UMP_3_NUM_AXES = 3

    def __init__(self) -> None:
        """Initialize uMp bindings."""

        # Establish connection to Sensapex API (exit if connection fails).
        UMP.set_library_path(RESOURCES_DIRECTORY)
        self._ump: UMP = UMP.get_ump()  # pyright: ignore [reportUnknownMemberType]

        # Compute axis count, assumed as the first device. 0 if no devices are connected.
        device_ids = list(map(str, self._ump.list_devices()))
        self.axis_count: int = 0 if len(device_ids) == 0 else self._get_device(device_ids[0]).n_axes()

    @staticmethod
    @override
    def get_display_name() -> str:
        return "Sensapex uMp"

    @staticmethod
    @override
    def get_cli_name() -> str:
        return "ump"

    @override
    async def get_manipulators(self) -> list[str]:
        device_ids = list(map(str, self._ump.list_devices()))

        # Currently only supports using uMp-4 XOR uMp-3. Throw error if both are connected.
        if any(self._get_device(device_id).n_axes() != self.axis_count for device_id in device_ids):  # pyright: ignore [reportUnknownArgumentType]
            msg = "uMp-4 and uMp-3 cannot be used at the same time."
            raise RuntimeError(msg)

        return device_ids

    @override
    async def get_axes_count(self) -> int:
        return self.axis_count

    @override
    def get_dimensions(self) -> Vector4:
        return Vector4(x=20, y=20, z=20, w=20)

    @override
    async def get_position(self, manipulator_id: str) -> Vector4:
        # Get the position list from the device.
        position = self._get_device(manipulator_id).get_pos(1)  # pyright: ignore [reportUnknownMemberType]

        # Copy x-coordinate into depth for uMp-3.
        return um_to_mm(list_to_vector4([*position, position[0]] if self._is_ump_3() else position))

    @override
    async def get_angles(self, manipulator_id: str) -> NoReturn:
        """uMp does not support getting angles so raise an error.

        Raises:
            AttributeError: uMp does not support getting angles.
        """
        error_message = "uMp does not support getting angles"
        raise AttributeError(error_message)

    @override
    async def get_shank_count(self, manipulator_id: str) -> NoReturn:
        """uMp does not support getting shank count so raise an error.

        Raises:
            AttributeError: uMp does not support getting shank count.
        """
        error_message = "uMp does not support getting shank count"
        raise AttributeError(error_message)

    @staticmethod
    @override
    def get_movement_tolerance() -> float:
        return 0.001

    @override
    async def set_position(self, manipulator_id: str, position: Vector4, speed: float) -> Vector4:
        # Convert position to micrometers array.
        target_position_um = vector4_to_array(vector_mm_to_um(position))

        # Request movement (clip 4th axis for uMp-3).
        movement = self._get_device(
            manipulator_id
        ).goto_pos(  # pyright: ignore [reportUnknownMemberType]
            target_position_um[: self.UMP_3_NUM_AXES] if self._is_ump_3() else target_position_um,
            scalar_mm_to_um(speed),
        )

        # Wait for movement to finish.
        _ = await get_running_loop().run_in_executor(None, movement.finished_event.wait, None)

        # Handle interrupted movement.
        if movement.interrupted:
            error_message = f"Manipulator {manipulator_id} interrupted: {movement.interrupt_reason}"  # pyright: ignore [reportUnknownMemberType]
            raise RuntimeError(error_message)

        # Handle empty end position.
        if movement.last_pos is None or len(movement.last_pos) == 0:  # pyright: ignore [reportUnknownMemberType, reportUnknownArgumentType]
            error_message = f"Manipulator {manipulator_id} did not reach target position"
            raise RuntimeError(error_message)

        return um_to_mm(
            list_to_vector4([*movement.last_pos, movement.last_pos[0]] if self._is_ump_3() else list(movement.last_pos))  # pyright: ignore [reportUnknownArgumentType, reportUnknownMemberType]
        )

    @override
    async def set_depth(self, manipulator_id: str, depth: float, speed: float) -> float:
        # Augment current position with depth.
        current_position = await self.get_position(manipulator_id)
        new_platform_position = current_position.model_copy(update={"x" if self._is_ump_3() else "w": depth})

        # Make the movement.
        final_platform_position = await self.set_position(manipulator_id, new_platform_position, speed)

        # Return the final depth.
        return float(final_platform_position.w)

    @override
    async def stop(self, manipulator_id: str) -> None:
        self._get_device(manipulator_id).stop()

    @override
    def platform_space_to_unified_space(self, platform_space: Vector4) -> Vector4:
        """
        For uMp-3:
        unified   <-  platform
        +x        <-  +y
        +y        <-  -x
        +z        <-  -z
        +d        <-  +d

        For uMp-4:
        unified   <-  platform
        +x        <-  +y
        +y        <-  -z
        +z        <-  +x
        +d        <-  +d
        """

        return (
            Vector4(
                x=platform_space.y,
                y=self.get_dimensions().x - platform_space.x,
                z=self.get_dimensions().z - platform_space.z,
                w=platform_space.w,
            )
            if self._is_ump_3()
            else Vector4(
                x=platform_space.y,
                y=self.get_dimensions().z - platform_space.z,
                z=platform_space.x,
                w=platform_space.w,
            )
        )

    @override
    def unified_space_to_platform_space(self, unified_space: Vector4) -> Vector4:
        """
        For uMp-3:
        platform  <-  unified
        +x        <-  -y
        +y        <-  +x
        +z        <-  -z
        +d        <-  +d

        For uMp-4:
        platform  <-  unified
        +x        <-  +z
        +y        <-  +x
        +z        <-  -y
        +d        <-  +d
        """

        return (
            Vector4(
                x=self.get_dimensions().y - unified_space.y,
                y=unified_space.x,
                z=self.get_dimensions().z - unified_space.z,
                w=unified_space.w,
            )
            if self._is_ump_3()
            else Vector4(
                x=unified_space.z,
                y=unified_space.x,
                z=self.get_dimensions().y - unified_space.y,
                w=unified_space.w,
            )
        )

    # Helper methods.
    def _get_device(self, manipulator_id: str) -> SensapexDevice:
        """Returns the Sensapex device object for the given manipulator ID.

        Args:
            manipulator_id: Manipulator ID.
        Returns:
            Sensapex device object.
        """

        return self._ump.get_device(int(manipulator_id))  # pyright: ignore [reportUnknownMemberType]

    def _is_ump_3(self) -> bool:
        """Check if the current device is uMp-3.

        Returns:
            True if the device is uMp-3, False otherwise.
        """
        return self.axis_count == self.UMP_3_NUM_AXES

__init__()

Initialize uMp bindings.

Source code in src/ephys_link/bindings/ump_binding.py
def __init__(self) -> None:
    """Initialize uMp bindings."""

    # Establish connection to Sensapex API (exit if connection fails).
    UMP.set_library_path(RESOURCES_DIRECTORY)
    self._ump: UMP = UMP.get_ump()  # pyright: ignore [reportUnknownMemberType]

    # Compute axis count, assumed as the first device. 0 if no devices are connected.
    device_ids = list(map(str, self._ump.list_devices()))
    self.axis_count: int = 0 if len(device_ids) == 0 else self._get_device(device_ids[0]).n_axes()

get_angles(manipulator_id) async

uMp does not support getting angles so raise an error.

Raises:

Type Description
AttributeError

uMp does not support getting angles.

Source code in src/ephys_link/bindings/ump_binding.py
@override
async def get_angles(self, manipulator_id: str) -> NoReturn:
    """uMp does not support getting angles so raise an error.

    Raises:
        AttributeError: uMp does not support getting angles.
    """
    error_message = "uMp does not support getting angles"
    raise AttributeError(error_message)

get_shank_count(manipulator_id) async

uMp does not support getting shank count so raise an error.

Raises:

Type Description
AttributeError

uMp does not support getting shank count.

Source code in src/ephys_link/bindings/ump_binding.py
@override
async def get_shank_count(self, manipulator_id: str) -> NoReturn:
    """uMp does not support getting shank count so raise an error.

    Raises:
        AttributeError: uMp does not support getting shank count.
    """
    error_message = "uMp does not support getting shank count"
    raise AttributeError(error_message)

platform_space_to_unified_space(platform_space)

For uMp-3: unified <- platform +x <- +y +y <- -x +z <- -z +d <- +d

For uMp-4: unified <- platform +x <- +y +y <- -z +z <- +x +d <- +d

Source code in src/ephys_link/bindings/ump_binding.py
@override
def platform_space_to_unified_space(self, platform_space: Vector4) -> Vector4:
    """
    For uMp-3:
    unified   <-  platform
    +x        <-  +y
    +y        <-  -x
    +z        <-  -z
    +d        <-  +d

    For uMp-4:
    unified   <-  platform
    +x        <-  +y
    +y        <-  -z
    +z        <-  +x
    +d        <-  +d
    """

    return (
        Vector4(
            x=platform_space.y,
            y=self.get_dimensions().x - platform_space.x,
            z=self.get_dimensions().z - platform_space.z,
            w=platform_space.w,
        )
        if self._is_ump_3()
        else Vector4(
            x=platform_space.y,
            y=self.get_dimensions().z - platform_space.z,
            z=platform_space.x,
            w=platform_space.w,
        )
    )

unified_space_to_platform_space(unified_space)

For uMp-3: platform <- unified +x <- -y +y <- +x +z <- -z +d <- +d

For uMp-4: platform <- unified +x <- +z +y <- +x +z <- -y +d <- +d

Source code in src/ephys_link/bindings/ump_binding.py
@override
def unified_space_to_platform_space(self, unified_space: Vector4) -> Vector4:
    """
    For uMp-3:
    platform  <-  unified
    +x        <-  -y
    +y        <-  +x
    +z        <-  -z
    +d        <-  +d

    For uMp-4:
    platform  <-  unified
    +x        <-  +z
    +y        <-  +x
    +z        <-  -y
    +d        <-  +d
    """

    return (
        Vector4(
            x=self.get_dimensions().y - unified_space.y,
            y=unified_space.x,
            z=self.get_dimensions().z - unified_space.z,
            w=unified_space.w,
        )
        if self._is_ump_3()
        else Vector4(
            x=unified_space.z,
            y=unified_space.x,
            z=self.get_dimensions().y - unified_space.y,
            w=unified_space.w,
        )
    )