> ## Documentation Index
> Fetch the complete documentation index at: https://dimensional-cc-frontier.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Adding a custom arm

# How to Integrate a New Manipulator Arm

This guide walks through integrating a new robot arm with DimOS, from writing the hardware adapter to creating blueprints for planning and control.

## Architecture Overview

DimOS uses a **Protocol-based adapter pattern** — no base class inheritance required. Your adapter wraps the vendor SDK and exposes a standard interface that the rest of the system consumes:

```
┌──────────────────────────────────────────────────────────────┐
│              ManipulationModule (Planning)                    │
│  - Plans collision-free trajectories using Drake             │
│  - Sends trajectories to coordinator via RPC                 │
└───────────────────────┬──────────────────────────────────────┘
                        │ RPC: execute trajectory
┌───────────────────────▼──────────────────────────────────────┐
│              ControlCoordinator (100Hz control loop)          │
│  - Reads state from all adapters                             │
│  - Runs tasks (trajectory, servo, velocity)                  │
│  - Arbitrates per-joint conflicts (priority-based)           │
│  - Routes commands to the correct adapter                    │
│  - Publishes aggregated joint state                          │
└───────────────────────┬──────────────────────────────────────┘
                        │ uses
┌───────────────────────▼──────────────────────────────────────┐
│              Your Adapter (implements Protocol)               │
│  - Wraps vendor SDK (TCP/IP, CAN, serial, etc.)             │
│  - Converts between vendor units and SI units                │
│  - Handles connection lifecycle                              │
└──────────────────────────────────────────────────────────────┘
```

> See also: `dimos/hardware/manipulators/README.md` for a quick reference.

## Prerequisites

1. **Vendor SDK** — The Python SDK for your robot arm (e.g., `xarm-python-sdk`, `piper-sdk`)
2. **URDF/xacro** — A robot description file (only needed if you want motion planning)
3. **Connection info** — IP address, CAN port, serial device, etc.

## Step 1: Create the Adapter

Create a new directory for your arm under `dimos/hardware/manipulators/`:

```
dimos/hardware/manipulators/
├── spec.py              # ManipulatorAdapter Protocol (don't modify)
├── registry.py          # Manifest-based adapter registry (don't modify)
├── mock/
├── xarm/
├── piper/
└── yourarm/             # ← New directory
    ├── __init__.py
    ├── __registry__.py
    └── adapter.py
```

### adapter.py — Full Skeleton

Below is a complete annotated adapter. Implement each method by wrapping your vendor SDK calls. All values crossing the adapter boundary **must use SI units**.

| Quantity         | SI Unit |
| ---------------- | ------- |
| Angles           | radians |
| Angular velocity | rad/s   |
| Torque           | Nm      |
| Position         | meters  |
| Force            | Newtons |

```python skip theme={null}
"""YourArm adapter — implements ManipulatorAdapter protocol.

SDK Units: <describe your SDK's native units here>
DimOS Units: angles=radians, distance=meters, velocity=rad/s
"""

from __future__ import annotations

import math

# Import your vendor SDK
from yourarm_sdk import YourArmSDK

from dimos.hardware.manipulators.spec import (
    ControlMode,
    JointLimits,
    ManipulatorInfo,
)

# Unit conversion constants (if your SDK doesn't use SI units)
MM_TO_M = 0.001
M_TO_MM = 1000.0


class YourArmAdapter:
    """YourArm hardware adapter.

    Implements ManipulatorAdapter protocol via duck typing.
    No inheritance required — just match the method signatures in spec.py.
    """

    def __init__(self, address: str, dof: int = 6) -> None:
        """Initialize the adapter.

        Args:
            address: Connection address (IP, CAN port, serial device, etc.)
            dof: Degrees of freedom.
        """
        if not address:
            raise ValueError("address is required for YourArmAdapter")
        self._address = address
        self._dof = dof
        self._sdk: YourArmSDK | None = None
        self._control_mode: ControlMode = ControlMode.POSITION


    def connect(self) -> bool:
        """Connect to hardware. Returns True on success."""
        try:
            self._sdk = YourArmSDK(self._address)
            self._sdk.connect()
            # Verify connection succeeded
            if not self._sdk.is_alive():
                print(f"ERROR: Arm at {self._address} not reachable")
                return False
            return True
        except Exception as e:
            print(f"ERROR: Failed to connect to arm at {self._address}: {e}")
            return False

    def disconnect(self) -> None:
        """Disconnect from hardware."""
        if self._sdk:
            self._sdk.disconnect()
            self._sdk = None

    def is_connected(self) -> bool:
        """Check if connected."""
        return self._sdk is not None and self._sdk.is_alive()

    def activate(self) -> bool:
        """Prepare hardware for commanded motion after connect()."""
        return self.write_enable(True)

    def deactivate(self) -> bool:
        """Gracefully stop commanded motion before disconnect()."""
        return self.write_stop()


    def get_info(self) -> ManipulatorInfo:
        """Get manipulator info (vendor, model, DOF)."""
        return ManipulatorInfo(
            vendor="YourVendor",
            model="YourModel",
            dof=self._dof,
            firmware_version=None,  # Optional: query from SDK if available
            serial_number=None,     # Optional: query from SDK if available
        )

    def get_dof(self) -> int:
        """Get degrees of freedom."""
        return self._dof

    def get_limits(self) -> JointLimits:
        """Get joint position and velocity limits in SI units.

        Either hardcode known limits or query them from the SDK.
        """
        return JointLimits(
            position_lower=[-math.pi] * self._dof,     # radians
            position_upper=[math.pi] * self._dof,       # radians
            velocity_max=[math.pi] * self._dof,          # rad/s
        )


    def set_control_mode(self, mode: ControlMode) -> bool:
        """Set control mode.

        Map DimOS ControlMode enum values to your SDK's mode codes.
        Return False for modes your arm doesn't support.
        """
        if not self._sdk:
            return False

        mode_map = {
            ControlMode.POSITION: 0,        # Your SDK's position mode code
            ControlMode.SERVO_POSITION: 1,   # High-frequency servo mode
            ControlMode.VELOCITY: 4,         # Velocity mode
            # Add other supported modes...
        }

        sdk_mode = mode_map.get(mode)
        if sdk_mode is None:
            return False  # Unsupported mode

        success = self._sdk.set_mode(sdk_mode)
        if success:
            self._control_mode = mode
        return success

    def get_control_mode(self) -> ControlMode:
        """Get current control mode."""
        return self._control_mode


    def read_joint_positions(self) -> list[float]:
        """Read current joint positions in radians.

        Convert from SDK units to radians.
        """
        if not self._sdk:
            raise RuntimeError("Not connected")
        raw_positions = self._sdk.get_joint_positions()
        return [math.radians(p) for p in raw_positions[:self._dof]]

    def read_joint_velocities(self) -> list[float]:
        """Read current joint velocities in rad/s.

        If your SDK doesn't provide velocity feedback, return zeros.
        The coordinator can estimate velocity via finite differences.
        """
        if not self._sdk:
            return [0.0] * self._dof
        # If SDK supports velocity reading:
        # raw_velocities = self._sdk.get_joint_velocities()
        # return [math.radians(v) for v in raw_velocities[:self._dof]]
        return [0.0] * self._dof

    def read_joint_efforts(self) -> list[float]:
        """Read current joint torques in Nm.

        If your SDK doesn't provide torque feedback, return zeros.
        """
        if not self._sdk:
            return [0.0] * self._dof
        # If SDK supports torque reading:
        # return list(self._sdk.get_joint_torques()[:self._dof])
        return [0.0] * self._dof

    def read_state(self) -> dict[str, int]:
        """Read robot state (mode, state code, etc)."""
        if not self._sdk:
            return {"state": 0, "mode": 0}
        return {
            "state": self._sdk.get_state(),
            "mode": self._sdk.get_mode(),
        }

    def read_error(self) -> tuple[int, str]:
        """Read error code and message. (0, '') means no error."""
        if not self._sdk:
            return 0, ""
        code = self._sdk.get_error_code()
        if code == 0:
            return 0, ""
        return code, f"YourArm error {code}"


    def write_joint_positions(
        self,
        positions: list[float],
        velocity: float = 1.0,
    ) -> bool:
        """Command joint positions in radians.

        Args:
            positions: Target positions in radians.
            velocity: Speed as fraction of max (0-1).

        Convert from radians to SDK units before sending.
        """
        if not self._sdk:
            return False
        sdk_positions = [math.degrees(p) for p in positions]
        return self._sdk.set_joint_positions(sdk_positions)

    def write_joint_velocities(self, velocities: list[float]) -> bool:
        """Command joint velocities in rad/s.

        Return False if velocity control is not supported.
        """
        if not self._sdk:
            return False
        sdk_velocities = [math.degrees(v) for v in velocities]
        return self._sdk.set_joint_velocities(sdk_velocities)

    def write_stop(self) -> bool:
        """Stop all motion immediately."""
        if not self._sdk:
            return False
        return self._sdk.emergency_stop()


    def write_enable(self, enable: bool) -> bool:
        """Enable or disable servos."""
        if not self._sdk:
            return False
        return self._sdk.enable_motors(enable)

    def read_enabled(self) -> bool:
        """Check if servos are enabled."""
        if not self._sdk:
            return False
        return self._sdk.motors_enabled()

    def write_clear_errors(self) -> bool:
        """Clear error state."""
        if not self._sdk:
            return False
        return self._sdk.clear_errors()


    def read_cartesian_position(self) -> dict[str, float] | None:
        """Read end-effector pose.

        Returns dict with keys: x, y, z (meters), roll, pitch, yaw (radians).
        Return None if not supported.
        """
        return None  # Or implement if your SDK supports it

    def write_cartesian_position(
        self,
        pose: dict[str, float],
        velocity: float = 1.0,
    ) -> bool:
        """Command end-effector pose. Return False if not supported."""
        return False


    def read_gripper_position(self) -> float | None:
        """Read gripper position in meters. Return None if no gripper."""
        return None

    def write_gripper_position(self, position: float) -> bool:
        """Command gripper position in meters. Return False if no gripper."""
        return False


    def read_force_torque(self) -> list[float] | None:
        """Read F/T sensor data [fx, fy, fz, tx, ty, tz]. None if no sensor."""
        return None

__all__ = ["YourArmAdapter"]
```

### Key implementation notes

* **Unsupported features** — Return `None` for reads and `False` for writes. Never raise exceptions for optional features.
* **Velocity/effort feedback** — If your SDK doesn't provide these, return zeros. The coordinator handles this gracefully.
* **Lazy SDK import** — If the vendor SDK is an optional dependency, you can import it inside `connect()` instead of at module level (see Piper adapter for this pattern):

  ```py theme={null}
  def connect(self) -> bool:
      try:
          from yourarm_sdk import YourArmSDK
          self._sdk = YourArmSDK(self._address)
          ...
      except ImportError:
          print("ERROR: yourarm-sdk not installed. Run: pip install yourarm-sdk")
          return False
  ```

## Step 2: Create Package Files

### \_\_init\_\_.py

```python skip theme={null}
"""YourArm manipulator hardware adapter.

Usage:
    >>> from dimos.hardware.manipulators.yourarm import YourArmAdapter
    >>> adapter = YourArmAdapter(address="192.168.1.100", dof=6)
    >>> adapter.connect()
    >>> positions = adapter.read_joint_positions()
"""

from dimos.hardware.manipulators.yourarm.adapter import YourArmAdapter

__all__ = ["YourArmAdapter"]
```

### `__registry__.py`

Create a lightweight manifest next to `adapter.py`:

```python skip theme={null}
ADAPTER_FACTORIES = {
    "yourarm": "dimos.hardware.manipulators.yourarm.adapter:YourArmAdapter",
}

__all__ = ["ADAPTER_FACTORIES"]
```

### How auto-discovery works

The `AdapterRegistry` in `dimos/hardware/manipulators/registry.py` automatically discovers your adapter metadata at import time:

1. It iterates over all subpackages under `dimos/hardware/manipulators/`
2. For each subpackage, it imports `<subpackage>.__registry__`
3. It reads `ADAPTER_FACTORIES` entries such as `"yourarm": "dimos.hardware.manipulators.yourarm.adapter:YourArmAdapter"`

This means **no central registry edit is needed**. The adapter implementation module is imported lazily only when the adapter is selected through `adapter_registry.create("yourarm", ...)`, so unrelated adapter listing does not import your vendor SDK.

You can verify discovery works:

```python skip theme={null}
from dimos.hardware.manipulators.registry import adapter_registry
print(adapter_registry.available())  # Should include "yourarm"
```

## Step 3: Create Your Robot Folder and Blueprints

Each robot in DimOS gets its own folder under `dimos/robot/`. This is where you define all blueprints for your arm — coordinator, planning, perception, etc. This follows the same pattern as Unitree robots (`dimos/robot/unitree/`).

### 3a. Create the robot directory

```
dimos/robot/
├── unitree/                 # Unitree robots (reference example)
│   ├── go2/
│   │   └── blueprints/
│   └── g1/
│       └── blueprints/
└── yourarm/                 # ← New directory for your robot
    ├── __init__.py
    └── blueprints.py
```

### 3b. Define your blueprints

Create `dimos/robot/yourarm/blueprints.py` with your coordinator and (optionally) planning blueprints:

```python skip theme={null}
"""Blueprints for YourArm robot.

Usage:
    # Run via CLI:
    dimos run coordinator-yourarm          # Start coordinator with real hardware
    dimos run yourarm-planner              # Start planner (optional, for motion planning)

    # Or programmatically:
    from dimos.robot.yourarm.blueprints import coordinator_yourarm
    coordinator = coordinator_yourarm.build()
    coordinator.loop()
"""

from __future__ import annotations

from pathlib import Path

from dimos.control.components import HardwareComponent, HardwareType, make_joints
from dimos.control.coordinator import ControlCoordinator, TaskConfig


# YourArm (6-DOF) — real hardware
coordinator_yourarm = ControlCoordinator.blueprint(
    tick_rate=100.0,                    # Control loop frequency (Hz)
    publish_joint_state=True,           # Publish aggregated joint state
    joint_state_frame_id="coordinator",
    hardware=[
        HardwareComponent(
            hardware_id="arm",                        # Unique ID for this hardware
            hardware_type=HardwareType.MANIPULATOR,
            joints=make_joints("arm", 6),             # Creates ["arm_joint1", ..., "arm_joint6"]
            adapter_type="yourarm",                   # Must match registry name
            address="192.168.1.100",                  # Passed to adapter __init__
            auto_enable=True,                         # Auto-enable servos on start
        ),
    ],
    tasks=[
        TaskConfig(
            name="traj_arm",                          # Task name (used by ManipulationModule RPC)
            type="trajectory",                        # Trajectory execution task
            joint_names=[f"arm_joint{i+1}" for i in range(6)],
            priority=10,                              # Higher priority wins arbitration
        ),
    ],
)


```

### Blueprint field reference

| Field           | Description                                                                               |
| --------------- | ----------------------------------------------------------------------------------------- |
| `hardware_id`   | Unique name for this hardware component. Used to route commands.                          |
| `adapter_type`  | Name registered with `adapter_registry` (e.g., `"yourarm"`).                              |
| `address`       | Connection info passed to adapter's `__init__` as `address` kwarg.                        |
| `joints`        | List of joint names. `make_joints("arm", 6)` creates `["arm_joint1", ..., "arm_joint6"]`. |
| `auto_enable`   | If `True`, servos are enabled automatically when the coordinator starts.                  |
| `task.name`     | Name used by the ManipulationModule to invoke trajectory execution via RPC.               |
| `task.type`     | Task type: `"trajectory"`, `"servo"`, `"velocity"`, or `"cartesian_ik"`.                  |
| `task.priority` | Priority for per-joint arbitration. Higher number wins.                                   |

## Step 4: Add URDF and Planning Integration (Optional)

If you want motion planning (collision-free trajectories via Drake), you need a URDF and a planning blueprint. Add these to your robot's own `blueprints.py`.

### 4a. Add your URDF

Place your URDF/xacro files under LFS data so they can be resolved via `LfsPath`. `LfsPath` is a `Path` subclass that lazily downloads LFS data on first access — this avoids downloading at import time when the blueprint module is loaded.

```python skip theme={null}
from dimos.utils.data import LfsPath
from dimos.manipulation.manipulation_module import manipulation_module
from dimos.manipulation.planning.spec import RobotModelConfig
from dimos.manipulation.planning.spec.models import PlanningGroupDefinition
from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped
from dimos.msgs.geometry_msgs.Quaternion import Quaternion
from dimos.msgs.geometry_msgs.Vector3 import Vector3

# LfsPath defers download until the path is actually accessed
_YOURARM_URDF_PATH = LfsPath("yourarm_description/urdf/yourarm.urdf")
_YOURARM_PACKAGE_PATH = LfsPath("yourarm_description")


def _make_base_pose(x=0.0, y=0.0, z=0.0) -> PoseStamped:
    return PoseStamped(
        frame_id="map",
        position=Vector3(x=x, y=y, z=z),
        orientation=Quaternion(0.0, 0.0, 0.0, 1.0),
    )
```

### 4b. Create a robot model config helper

```python skip theme={null}
def _make_yourarm_config(
    name: str = "arm",
    y_offset: float = 0.0,
    coordinator_task: str | None = None,
) -> RobotModelConfig:
    """Create YourArm robot config for planning.

    Args:
        name: Robot name in the Drake planning world.
        y_offset: Y-axis offset for multi-arm setups.
        coordinator_task: Coordinator task name for trajectory execution via RPC.
    """
    # These must match the joint names in your URDF
    joint_names = ["joint1", "joint2", "joint3", "joint4", "joint5", "joint6"]

    return RobotModelConfig(
        name=name,
        model_path=_YOURARM_URDF_PATH,
        joint_names=joint_names,
        planning_groups=[
            PlanningGroupDefinition(
                name="manipulator",
                joint_names=tuple(joint_names),
                base_link="base_link",
                tip_link="link6",
                source="fallback",
            )
        ],
        base_pose=_make_base_pose(y=y_offset),  # Compatibility; prefer model placement
        base_link="base_link",                 # Compatibility robot-scoped base
        package_paths={"yourarm_description": _YOURARM_PACKAGE_PATH},
        xacro_args={},                  # Xacro arguments if using .xacro files
        collision_exclusion_pairs=[],   # Pairs of links that can touch (e.g., gripper fingers)
        auto_convert_meshes=True,       # Convert DAE/STL meshes for Drake
        max_velocity=1.0,               # Max velocity scaling factor
        max_acceleration=2.0,           # Max acceleration scaling factor
        coordinator_task_name=coordinator_task,
    )
```

### 4c. Create a planning blueprint

Add this to your `dimos/robot/yourarm/blueprints.py` alongside the coordinator blueprint:

```python skip theme={null}

yourarm_planner = manipulation_module(
    robots=[_make_yourarm_config("arm", coordinator_task="traj_arm")],
    planning_timeout=10.0,
    visualization={"backend": "meshcat"},
)
# The planner's `coordinator_joint_state` input auto-connects to the
# ControlCoordinator's output on the default `/coordinator_joint_state`
# topic, so no `.transports(...)` override is needed.
```

### Key config fields

| Field                           | Description                                                                                                               |
| ------------------------------- | ------------------------------------------------------------------------------------------------------------------------- |
| `model_path`                    | Path to `.urdf` or `.xacro` file                                                                                          |
| `joint_names`                   | Ordered controllable local model joint set (must match URDF); not itself a planning group                                 |
| `planning_groups` / `srdf_path` | Explicit planning groups or SRDF source; fallback can generate `{robot_name}/manipulator` for an unambiguous single chain |
| `package_paths`                 | Maps `package://` URIs to filesystem paths (for xacro)                                                                    |
| `coordinator_task_name`         | Must match the `TaskConfig.name` in your coordinator blueprint                                                            |
| `collision_exclusion_pairs`     | List of `(link_a, link_b)` tuples for links that may legitimately touch (e.g., gripper fingers)                           |

Coordinator-facing joint states and trajectories use global joint names derived
mechanically as `{robot_name}/{local_joint_name}` (for example, `arm/joint1`).
Keep hardware-native name translation inside the hardware adapter; manipulation
planning config uses local model joint names.

`base_link`, `base_pose`, and `end_effector_link` are compatibility fields used
by current placement and robot-scoped helper paths. New planning code should use
SRDF/planning-group chain base/tip links and encode robot placement in the model. See
[Planning Groups](/docs/capabilities/manipulation/planning_groups.md).

## Step 5: Register Blueprints

The blueprint registry in `dimos/robot/all_blueprints.py` is **auto-generated** by scanning the codebase for blueprint declarations. After adding your blueprints:

1. Run the generation test to update the registry:
   ```bash theme={null}
   pytest dimos/robot/test_all_blueprints_generation.py
   ```
2. Now you can run your arm via CLI:
   ```bash theme={null}
   dimos run coordinator-yourarm
   dimos run yourarm-planner        # If you added a planning blueprint
   ```

## Step 6: Testing

### Verify adapter registration

```python skip theme={null}
from dimos.hardware.manipulators.registry import adapter_registry

# Check your adapter shows up
assert "yourarm" in adapter_registry.available()

# Create an instance via registry (same path the coordinator uses)
adapter = adapter_registry.create("yourarm", address="192.168.1.100", dof=6)
```

### Unit test with mock

You can test coordinator logic without hardware by using `unittest.mock`:

```python skip theme={null}
import pytest
from unittest.mock import MagicMock
from dimos.hardware.manipulators.spec import ManipulatorAdapter

@pytest.fixture
def mock_adapter():
    adapter = MagicMock(spec=ManipulatorAdapter)
    adapter.get_dof.return_value = 6
    adapter.read_joint_positions.return_value = [0.0] * 6
    adapter.read_joint_velocities.return_value = [0.0] * 6
    adapter.read_joint_efforts.return_value = [0.0] * 6
    adapter.write_joint_positions.return_value = True
    adapter.activate.return_value = True
    adapter.deactivate.return_value = True
    adapter.read_enabled.return_value = True
    adapter.is_connected.return_value = True
    return adapter

def test_read_positions(mock_adapter):
    assert mock_adapter.read_joint_positions() == [0.0] * 6

def test_write_positions(mock_adapter):
    target = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6]
    assert mock_adapter.write_joint_positions(target) is True
```

### Integration test with coordinator

```python skip theme={null}
from dimos.robot.manipulators.common.mock import coordinator_mock
from dimos.core.coordination.module_coordinator import ModuleCoordinator

# Build and start coordinator with mock hardware
coordinator = ModuleCoordinator.build(coordinator_mock)
coordinator.start()

# Your adapter is tested through the same coordinator interface
# Just swap adapter_type="mock" to adapter_type="yourarm" in a blueprint

coordinator.stop()
```

### Test the real adapter standalone

```python skip theme={null}
from dimos.hardware.manipulators.yourarm import YourArmAdapter

adapter = YourArmAdapter(address="192.168.1.100", dof=6)
assert adapter.connect() is True
assert adapter.is_connected() is True

# Read state
positions = adapter.read_joint_positions()
assert len(positions) == 6
print(f"Joint positions (rad): {positions}")

# Activate and move
adapter.activate()
adapter.write_joint_positions([0.0] * 6)

# Cleanup
adapter.deactivate()
adapter.disconnect()
```

## Quick Reference Checklist

Files to create:

* [ ] `dimos/hardware/manipulators/yourarm/__init__.py`
* [ ] `dimos/hardware/manipulators/yourarm/__registry__.py` (maps adapter key to implementation path)
* [ ] `dimos/hardware/manipulators/yourarm/adapter.py` (implements Protocol)
* [ ] `dimos/robot/yourarm/__init__.py`
* [ ] `dimos/robot/yourarm/blueprints.py` (coordinator + planning blueprints)

Files to modify:

* [ ] `pyproject.toml` — Add vendor SDK to optional dependencies *(if applicable)*

Verification:

* [ ] `adapter_registry.available()` includes `"yourarm"`
* [ ] `adapter_registry.create("yourarm", address="192.168.1.100", dof=6)` returns your adapter
* [ ] `pytest dimos/robot/test_all_blueprints_generation.py` passes (regenerates `all_blueprints.py`)
* [ ] `dimos run coordinator-yourarm` starts successfully
