Bindings for New Scale Pathfinder MPM HTTP server platform.

Usage: Instantiate MPMBindings to interact with the New Scale Pathfinder MPM HTTP server platform.


Bases: BaseBinding

Bindings for New Scale Pathfinder MPM HTTP server platform.

Source code in src/ephys_link/bindings/
class MPMBinding(BaseBinding):
    """Bindings for New Scale Pathfinder MPM HTTP server platform."""

    # Valid New Scale manipulator IDs

    # Server cache lifetime (60 FPS).
    CACHE_LIFETIME = 1 / 60

    # Movement polling preferences.

    # Speed preferences (mm/s to use coarse mode).

    def __init__(self, port: int = 8080) -> None:
        """Initialize connection to MPM HTTP server.

            port: Port number for MPM HTTP server.
        self._url = f"http://localhost:{port}"
        self._movement_stopped = False

        # Data cache.
        self.cache: dict[str, Any] = {}  # pyright: ignore [reportExplicitAny]
        self.cache_time = 0

    def get_display_name() -> str:
        return "Pathfinder MPM Control v2.8.8+"

    def get_cli_name() -> str:
        return "pathfinder-mpm"

    async def get_manipulators(self) -> list[str]:
        return [manipulator["Id"] for manipulator in (await self._query_data())["ProbeArray"]]  # pyright: ignore [reportAny]

    async def get_axes_count(self) -> int:
        return 3

    def get_dimensions(self) -> Vector4:
        return Vector4(x=15, y=15, z=15, w=15)

    async def get_position(self, manipulator_id: str) -> Vector4:
        manipulator_data: dict[str, float] = await self._manipulator_data(manipulator_id)
        stage_z: float = manipulator_data["Stage_Z"]

        await sleep(self.POLL_INTERVAL)  # Wait for the stage to stabilize.

        return Vector4(

    async def get_angles(self, manipulator_id: str) -> Vector3:
        manipulator_data: dict[str, float] = await self._manipulator_data(manipulator_id)

        # Apply PosteriorAngle to Polar to get the correct angle.
        adjusted_polar: int = manipulator_data["Polar"] - (await self._query_data())["PosteriorAngle"]

        return Vector3(
            x=adjusted_polar if adjusted_polar > 0 else 360 + adjusted_polar,

    async def get_shank_count(self, manipulator_id: str) -> int:
        return int((await self._manipulator_data(manipulator_id))["ShankCount"])  # pyright: ignore [reportAny]

    def get_movement_tolerance(self) -> float:
        return 0.01

    async def set_position(self, manipulator_id: str, position: Vector4, speed: float) -> Vector4:
        # Keep track of the previous position to check if the manipulator stopped advancing.
        current_position = await self.get_position(manipulator_id)
        previous_position = current_position
        unchanged_counter = 0

        # Set step mode based on speed.
        await self._put_request(
                "PutId": "ProbeStepMode",
                "Probe": self.VALID_MANIPULATOR_IDS.index(manipulator_id),
                "StepMode": 0 if speed > self.COARSE_SPEED_THRESHOLD else 1,

        # Send move request.
        await self._put_request(
                "PutId": "ProbeMotion",
                "Probe": self.VALID_MANIPULATOR_IDS.index(manipulator_id),
                "Absolute": 1,
                "Stereotactic": 0,
                "AxisMask": 7,
                "X": position.x,
                "Y": position.y,
                "Z": position.z,

        # Wait for the manipulator to reach the target position or be stopped or stuck.
        while (
            not self._movement_stopped
            and not self._is_vector_close(current_position, position)
            and unchanged_counter < self.UNCHANGED_COUNTER_LIMIT
            # Wait for a short time before checking again.
            await sleep(self.POLL_INTERVAL)

            # Update current position.
            current_position = await self.get_position(manipulator_id)

            # Check if manipulator is not moving.
            if self._is_vector_close(previous_position, current_position):
                # Position did not change.
                unchanged_counter += 1
                # Position changed.
                unchanged_counter = 0
                previous_position = current_position

        # Reset movement stopped flag.
        self._movement_stopped = False

        # Return the final position.
        return await self.get_position(manipulator_id)

    async def set_depth(self, manipulator_id: str, depth: float, speed: float) -> float:
        # Keep track of the previous depth to check if the manipulator stopped advancing unexpectedly.
        current_depth = (await self.get_position(manipulator_id)).w
        previous_depth = current_depth
        unchanged_counter = 0

        # Send move request.
        # Convert mm/s to um/min and cap speed at the limit.
        await self._put_request(
                "PutId": "ProbeInsertion",
                "Probe": self.VALID_MANIPULATOR_IDS.index(manipulator_id),
                "Distance": scalar_mm_to_um(current_depth - depth),
                "Rate": min(scalar_mm_to_um(speed) * 60, self.INSERTION_SPEED_LIMIT),

        # Wait for the manipulator to reach the target depth or be stopped or get stuck.
        while not self._movement_stopped and not abs(current_depth - depth) <= self.get_movement_tolerance():
            # Wait for a short time before checking again.
            await sleep(self.POLL_INTERVAL)

            # Get the current depth.
            current_depth = (await self.get_position(manipulator_id)).w

            # Check if manipulator is not moving.
            if abs(previous_depth - current_depth) <= self.get_movement_tolerance():
                # Depth did not change.
                unchanged_counter += 1
                # Depth changed.
                unchanged_counter = 0
                previous_depth = current_depth

        # Reset movement stopped flag.
        self._movement_stopped = False

        # Return the final depth.
        return float((await self.get_position(manipulator_id)).w)

    async def stop(self, manipulator_id: str) -> None:
        request: dict[str, str | int | float] = {
            "PutId": "ProbeStop",
            "Probe": self.VALID_MANIPULATOR_IDS.index(manipulator_id),
        await self._put_request(request)
        self._movement_stopped = True

    def platform_space_to_unified_space(self, platform_space: Vector4) -> Vector4:
        # unified   <-  platform
        # +x        <-  -x
        # +y        <-  +z
        # +z        <-  +y
        # +w        <-  -w

        return Vector4(
            x=self.get_dimensions().x - platform_space.x,
            w=self.get_dimensions().w - platform_space.w,

    def unified_space_to_platform_space(self, unified_space: Vector4) -> Vector4:
        # platform  <-  unified
        # +x        <-  -x
        # +y        <-  +z
        # +z        <-  +y
        # +w        <-  -w

        return Vector4(
            x=self.get_dimensions().x - unified_space.x,
            w=self.get_dimensions().w - unified_space.w,

    # Helper functions.
    async def _query_data(self) -> dict[str, Any]:  # pyright: ignore [reportExplicitAny]
            # Update cache if it's expired.
            if get_running_loop().time() - self.cache_time > self.CACHE_LIFETIME:
                # noinspection PyTypeChecker
                self.cache = (await get_running_loop().run_in_executor(None, get, self._url)).json()
                self.cache_time = get_running_loop().time()
        except ConnectionError as connectionError:
            error_message = f"Unable to connect to MPM HTTP server: {connectionError}"
            raise RuntimeError(error_message) from connectionError
        except JSONDecodeError as jsonDecodeError:
            error_message = f"Unable to decode JSON response from MPM HTTP server: {jsonDecodeError}"
            raise ValueError(error_message) from jsonDecodeError
            # Return cached data.
            return self.cache

    async def _manipulator_data(self, manipulator_id: str) -> dict[str, Any]:  # pyright: ignore [reportExplicitAny]
        probe_data: list[dict[str, Any]] = (await self._query_data())["ProbeArray"]  # pyright: ignore [reportExplicitAny]
        for probe in probe_data:
            if probe["Id"] == manipulator_id:
                return probe

        # If we get here, that means the manipulator doesn't exist.
        error_message = f"Manipulator {manipulator_id} not found."
        raise ValueError(error_message)

    async def _put_request(self, request: dict[str, Any]) -> None:  # pyright: ignore [reportExplicitAny]
        _ = await get_running_loop().run_in_executor(None, put, self._url, dumps(request))

    def _is_vector_close(self, target: Vector4, current: Vector4) -> bool:
        return all(abs(axis) <= self.get_movement_tolerance() for axis in vector4_to_array(target - current)[:3])


