Skip to content

mpm_binding

Bindings for New Scale Pathfinder MPM HTTP server platform.

Usage: Instantiate MPMBindings to interact with the New Scale Pathfinder MPM HTTP server platform.

MPMBinding

Bases: BaseBinding

Bindings for New Scale Pathfinder MPM HTTP server platform.

Source code in src/ephys_link/bindings/mpm_binding.py
@final
class MPMBinding(BaseBinding):
    """Bindings for New Scale Pathfinder MPM HTTP server platform."""

    # Valid New Scale manipulator IDs
    VALID_MANIPULATOR_IDS = (
        "A",
        "B",
        "C",
        "D",
        "E",
        "F",
        "G",
        "H",
        "I",
        "J",
        "K",
        "L",
        "M",
        "N",
        "O",
        "P",
        "Q",
        "R",
        "S",
        "T",
        "U",
        "V",
        "W",
        "X",
        "Y",
        "Z",
        "AA",
        "AB",
        "AC",
        "AD",
        "AE",
        "AF",
        "AG",
        "AH",
        "AI",
        "AJ",
        "AK",
        "AL",
        "AM",
        "AN",
    )

    # Server cache lifetime (60 FPS).
    CACHE_LIFETIME = 1 / 60

    # Movement polling preferences.
    UNCHANGED_COUNTER_LIMIT = 10
    POLL_INTERVAL = 0.1

    # Speed preferences (mm/s to use coarse mode).
    COARSE_SPEED_THRESHOLD = 0.1
    INSERTION_SPEED_LIMIT = 9_000

    def __init__(self, port: int = 8080) -> None:
        """Initialize connection to MPM HTTP server.

        Args:
            port: Port number for MPM HTTP server.
        """
        self._url = f"http://localhost:{port}"
        self._movement_stopped = False

        # Data cache.
        self.cache: dict[str, Any] = {}  # pyright: ignore [reportExplicitAny]
        self.cache_time = 0

    @staticmethod
    @override
    def get_display_name() -> str:
        return "Pathfinder MPM Control v2.8.8+"

    @staticmethod
    @override
    def get_cli_name() -> str:
        return "pathfinder-mpm"

    @override
    async def get_manipulators(self) -> list[str]:
        return [manipulator["Id"] for manipulator in (await self._query_data())["ProbeArray"]]  # pyright: ignore [reportAny]

    @override
    async def get_axes_count(self) -> int:
        return 3

    @override
    def get_dimensions(self) -> Vector4:
        return Vector4(x=15, y=15, z=15, w=15)

    @override
    async def get_position(self, manipulator_id: str) -> Vector4:
        manipulator_data: dict[str, float] = await self._manipulator_data(manipulator_id)
        stage_z: float = manipulator_data["Stage_Z"]

        await sleep(self.POLL_INTERVAL)  # Wait for the stage to stabilize.

        return Vector4(
            x=manipulator_data["Stage_X"],
            y=manipulator_data["Stage_Y"],
            z=stage_z,
            w=stage_z,
        )

    @override
    async def get_angles(self, manipulator_id: str) -> Vector3:
        manipulator_data: dict[str, float] = await self._manipulator_data(manipulator_id)

        # Apply PosteriorAngle to Polar to get the correct angle.
        adjusted_polar: int = manipulator_data["Polar"] - (await self._query_data())["PosteriorAngle"]

        return Vector3(
            x=adjusted_polar if adjusted_polar > 0 else 360 + adjusted_polar,
            y=manipulator_data["Pitch"],
            z=manipulator_data["ShankOrientation"],
        )

    @override
    async def get_shank_count(self, manipulator_id: str) -> int:
        return int((await self._manipulator_data(manipulator_id))["ShankCount"])  # pyright: ignore [reportAny]

    @override
    def get_movement_tolerance(self) -> float:
        return 0.01

    @override
    async def set_position(self, manipulator_id: str, position: Vector4, speed: float) -> Vector4:
        # Keep track of the previous position to check if the manipulator stopped advancing.
        current_position = await self.get_position(manipulator_id)
        previous_position = current_position
        unchanged_counter = 0

        # Set step mode based on speed.
        await self._put_request(
            {
                "PutId": "ProbeStepMode",
                "Probe": self.VALID_MANIPULATOR_IDS.index(manipulator_id),
                "StepMode": 0 if speed > self.COARSE_SPEED_THRESHOLD else 1,
            }
        )

        # Send move request.
        await self._put_request(
            {
                "PutId": "ProbeMotion",
                "Probe": self.VALID_MANIPULATOR_IDS.index(manipulator_id),
                "Absolute": 1,
                "Stereotactic": 0,
                "AxisMask": 7,
                "X": position.x,
                "Y": position.y,
                "Z": position.z,
            }
        )

        # Wait for the manipulator to reach the target position or be stopped or stuck.
        while (
            not self._movement_stopped
            and not self._is_vector_close(current_position, position)
            and unchanged_counter < self.UNCHANGED_COUNTER_LIMIT
        ):
            # Wait for a short time before checking again.
            await sleep(self.POLL_INTERVAL)

            # Update current position.
            current_position = await self.get_position(manipulator_id)

            # Check if manipulator is not moving.
            if self._is_vector_close(previous_position, current_position):
                # Position did not change.
                unchanged_counter += 1
            else:
                # Position changed.
                unchanged_counter = 0
                previous_position = current_position

        # Reset movement stopped flag.
        self._movement_stopped = False

        # Return the final position.
        return await self.get_position(manipulator_id)

    @override
    async def set_depth(self, manipulator_id: str, depth: float, speed: float) -> float:
        # Keep track of the previous depth to check if the manipulator stopped advancing unexpectedly.
        current_depth = (await self.get_position(manipulator_id)).w
        previous_depth = current_depth
        unchanged_counter = 0

        # Send move request.
        # Convert mm/s to um/min and cap speed at the limit.
        await self._put_request(
            {
                "PutId": "ProbeInsertion",
                "Probe": self.VALID_MANIPULATOR_IDS.index(manipulator_id),
                "Distance": scalar_mm_to_um(current_depth - depth),
                "Rate": min(scalar_mm_to_um(speed) * 60, self.INSERTION_SPEED_LIMIT),
            }
        )

        # Wait for the manipulator to reach the target depth or be stopped or get stuck.
        while not self._movement_stopped and not abs(current_depth - depth) <= self.get_movement_tolerance():
            # Wait for a short time before checking again.
            await sleep(self.POLL_INTERVAL)

            # Get the current depth.
            current_depth = (await self.get_position(manipulator_id)).w

            # Check if manipulator is not moving.
            if abs(previous_depth - current_depth) <= self.get_movement_tolerance():
                # Depth did not change.
                unchanged_counter += 1
            else:
                # Depth changed.
                unchanged_counter = 0
                previous_depth = current_depth

        # Reset movement stopped flag.
        self._movement_stopped = False

        # Return the final depth.
        return float((await self.get_position(manipulator_id)).w)

    @override
    async def stop(self, manipulator_id: str) -> None:
        request: dict[str, str | int | float] = {
            "PutId": "ProbeStop",
            "Probe": self.VALID_MANIPULATOR_IDS.index(manipulator_id),
        }
        await self._put_request(request)
        self._movement_stopped = True

    @override
    def platform_space_to_unified_space(self, platform_space: Vector4) -> Vector4:
        # unified   <-  platform
        # +x        <-  -x
        # +y        <-  +z
        # +z        <-  +y
        # +w        <-  -w

        return Vector4(
            x=self.get_dimensions().x - platform_space.x,
            y=platform_space.z,
            z=platform_space.y,
            w=self.get_dimensions().w - platform_space.w,
        )

    @override
    def unified_space_to_platform_space(self, unified_space: Vector4) -> Vector4:
        # platform  <-  unified
        # +x        <-  -x
        # +y        <-  +z
        # +z        <-  +y
        # +w        <-  -w

        return Vector4(
            x=self.get_dimensions().x - unified_space.x,
            y=unified_space.z,
            z=unified_space.y,
            w=self.get_dimensions().w - unified_space.w,
        )

    # Helper functions.
    async def _query_data(self) -> dict[str, Any]:  # pyright: ignore [reportExplicitAny]
        try:
            # Update cache if it's expired.
            if get_running_loop().time() - self.cache_time > self.CACHE_LIFETIME:
                # noinspection PyTypeChecker
                self.cache = (await get_running_loop().run_in_executor(None, get, self._url)).json()
                self.cache_time = get_running_loop().time()
        except ConnectionError as connectionError:
            error_message = f"Unable to connect to MPM HTTP server: {connectionError}"
            raise RuntimeError(error_message) from connectionError
        except JSONDecodeError as jsonDecodeError:
            error_message = f"Unable to decode JSON response from MPM HTTP server: {jsonDecodeError}"
            raise ValueError(error_message) from jsonDecodeError
        else:
            # Return cached data.
            return self.cache

    async def _manipulator_data(self, manipulator_id: str) -> dict[str, Any]:  # pyright: ignore [reportExplicitAny]
        probe_data: list[dict[str, Any]] = (await self._query_data())["ProbeArray"]  # pyright: ignore [reportExplicitAny]
        for probe in probe_data:
            if probe["Id"] == manipulator_id:
                return probe

        # If we get here, that means the manipulator doesn't exist.
        error_message = f"Manipulator {manipulator_id} not found."
        raise ValueError(error_message)

    async def _put_request(self, request: dict[str, Any]) -> None:  # pyright: ignore [reportExplicitAny]
        _ = await get_running_loop().run_in_executor(None, put, self._url, dumps(request))

    def _is_vector_close(self, target: Vector4, current: Vector4) -> bool:
        return all(abs(axis) <= self.get_movement_tolerance() for axis in vector4_to_array(target - current)[:3])

__init__(port=8080)

Initialize connection to MPM HTTP server.

Parameters:

Name Type Description Default
port int

Port number for MPM HTTP server.

8080
Source code in src/ephys_link/bindings/mpm_binding.py
def __init__(self, port: int = 8080) -> None:
    """Initialize connection to MPM HTTP server.

    Args:
        port: Port number for MPM HTTP server.
    """
    self._url = f"http://localhost:{port}"
    self._movement_stopped = False

    # Data cache.
    self.cache: dict[str, Any] = {}  # pyright: ignore [reportExplicitAny]
    self.cache_time = 0