Skip to main content

How to Integrate a New Manipulator Arm

This guide walks through integrating a new robot arm with DimOS, from writing the hardware adapter to creating blueprints for planning and control.

Architecture Overview

DimOS uses a Protocol-based adapter pattern — no base class inheritance required. Your adapter wraps the vendor SDK and exposes a standard interface that the rest of the system consumes:
┌──────────────────────────────────────────────────────────────┐
│              ManipulationModule (Planning)                    │
│  - Plans collision-free trajectories using Drake             │
│  - Sends trajectories to coordinator via RPC                 │
└───────────────────────┬──────────────────────────────────────┘
                        │ RPC: execute trajectory
┌───────────────────────▼──────────────────────────────────────┐
│              ControlCoordinator (100Hz control loop)          │
│  - Reads state from all adapters                             │
│  - Runs tasks (trajectory, servo, velocity)                  │
│  - Arbitrates per-joint conflicts (priority-based)           │
│  - Routes commands to the correct adapter                    │
│  - Publishes aggregated joint state                          │
└───────────────────────┬──────────────────────────────────────┘
                        │ uses
┌───────────────────────▼──────────────────────────────────────┐
│              Your Adapter (implements Protocol)               │
│  - Wraps vendor SDK (TCP/IP, CAN, serial, etc.)             │
│  - Converts between vendor units and SI units                │
│  - Handles connection lifecycle                              │
└──────────────────────────────────────────────────────────────┘
See also: dimos/hardware/manipulators/README.md for a quick reference.

Prerequisites

  1. Vendor SDK — The Python SDK for your robot arm (e.g., xarm-python-sdk, piper-sdk)
  2. URDF/xacro — A robot description file (only needed if you want motion planning)
  3. Connection info — IP address, CAN port, serial device, etc.

Step 1: Create the Adapter

Create a new directory for your arm under dimos/hardware/manipulators/:
dimos/hardware/manipulators/
├── spec.py              # ManipulatorAdapter Protocol (don't modify)
├── registry.py          # Manifest-based adapter registry (don't modify)
├── mock/
├── xarm/
├── piper/
└── yourarm/             # ← New directory
    ├── __init__.py
    ├── __registry__.py
    └── adapter.py

adapter.py — Full Skeleton

Below is a complete annotated adapter. Implement each method by wrapping your vendor SDK calls. All values crossing the adapter boundary must use SI units.
QuantitySI Unit
Anglesradians
Angular velocityrad/s
TorqueNm
Positionmeters
ForceNewtons
skip
"""YourArm adapter — implements ManipulatorAdapter protocol.

SDK Units: <describe your SDK's native units here>
DimOS Units: angles=radians, distance=meters, velocity=rad/s
"""

from __future__ import annotations

import math

# Import your vendor SDK
from yourarm_sdk import YourArmSDK

from dimos.hardware.manipulators.spec import (
    ControlMode,
    JointLimits,
    ManipulatorInfo,
)

# Unit conversion constants (if your SDK doesn't use SI units)
MM_TO_M = 0.001
M_TO_MM = 1000.0


class YourArmAdapter:
    """YourArm hardware adapter.

    Implements ManipulatorAdapter protocol via duck typing.
    No inheritance required — just match the method signatures in spec.py.
    """

    def __init__(self, address: str, dof: int = 6) -> None:
        """Initialize the adapter.

        Args:
            address: Connection address (IP, CAN port, serial device, etc.)
            dof: Degrees of freedom.
        """
        if not address:
            raise ValueError("address is required for YourArmAdapter")
        self._address = address
        self._dof = dof
        self._sdk: YourArmSDK | None = None
        self._control_mode: ControlMode = ControlMode.POSITION


    def connect(self) -> bool:
        """Connect to hardware. Returns True on success."""
        try:
            self._sdk = YourArmSDK(self._address)
            self._sdk.connect()
            # Verify connection succeeded
            if not self._sdk.is_alive():
                print(f"ERROR: Arm at {self._address} not reachable")
                return False
            return True
        except Exception as e:
            print(f"ERROR: Failed to connect to arm at {self._address}: {e}")
            return False

    def disconnect(self) -> None:
        """Disconnect from hardware."""
        if self._sdk:
            self._sdk.disconnect()
            self._sdk = None

    def is_connected(self) -> bool:
        """Check if connected."""
        return self._sdk is not None and self._sdk.is_alive()

    def activate(self) -> bool:
        """Prepare hardware for commanded motion after connect()."""
        return self.write_enable(True)

    def deactivate(self) -> bool:
        """Gracefully stop commanded motion before disconnect()."""
        return self.write_stop()


    def get_info(self) -> ManipulatorInfo:
        """Get manipulator info (vendor, model, DOF)."""
        return ManipulatorInfo(
            vendor="YourVendor",
            model="YourModel",
            dof=self._dof,
            firmware_version=None,  # Optional: query from SDK if available
            serial_number=None,     # Optional: query from SDK if available
        )

    def get_dof(self) -> int:
        """Get degrees of freedom."""
        return self._dof

    def get_limits(self) -> JointLimits:
        """Get joint position and velocity limits in SI units.

        Either hardcode known limits or query them from the SDK.
        """
        return JointLimits(
            position_lower=[-math.pi] * self._dof,     # radians
            position_upper=[math.pi] * self._dof,       # radians
            velocity_max=[math.pi] * self._dof,          # rad/s
        )


    def set_control_mode(self, mode: ControlMode) -> bool:
        """Set control mode.

        Map DimOS ControlMode enum values to your SDK's mode codes.
        Return False for modes your arm doesn't support.
        """
        if not self._sdk:
            return False

        mode_map = {
            ControlMode.POSITION: 0,        # Your SDK's position mode code
            ControlMode.SERVO_POSITION: 1,   # High-frequency servo mode
            ControlMode.VELOCITY: 4,         # Velocity mode
            # Add other supported modes...
        }

        sdk_mode = mode_map.get(mode)
        if sdk_mode is None:
            return False  # Unsupported mode

        success = self._sdk.set_mode(sdk_mode)
        if success:
            self._control_mode = mode
        return success

    def get_control_mode(self) -> ControlMode:
        """Get current control mode."""
        return self._control_mode


    def read_joint_positions(self) -> list[float]:
        """Read current joint positions in radians.

        Convert from SDK units to radians.
        """
        if not self._sdk:
            raise RuntimeError("Not connected")
        raw_positions = self._sdk.get_joint_positions()
        return [math.radians(p) for p in raw_positions[:self._dof]]

    def read_joint_velocities(self) -> list[float]:
        """Read current joint velocities in rad/s.

        If your SDK doesn't provide velocity feedback, return zeros.
        The coordinator can estimate velocity via finite differences.
        """
        if not self._sdk:
            return [0.0] * self._dof
        # If SDK supports velocity reading:
        # raw_velocities = self._sdk.get_joint_velocities()
        # return [math.radians(v) for v in raw_velocities[:self._dof]]
        return [0.0] * self._dof

    def read_joint_efforts(self) -> list[float]:
        """Read current joint torques in Nm.

        If your SDK doesn't provide torque feedback, return zeros.
        """
        if not self._sdk:
            return [0.0] * self._dof
        # If SDK supports torque reading:
        # return list(self._sdk.get_joint_torques()[:self._dof])
        return [0.0] * self._dof

    def read_state(self) -> dict[str, int]:
        """Read robot state (mode, state code, etc)."""
        if not self._sdk:
            return {"state": 0, "mode": 0}
        return {
            "state": self._sdk.get_state(),
            "mode": self._sdk.get_mode(),
        }

    def read_error(self) -> tuple[int, str]:
        """Read error code and message. (0, '') means no error."""
        if not self._sdk:
            return 0, ""
        code = self._sdk.get_error_code()
        if code == 0:
            return 0, ""
        return code, f"YourArm error {code}"


    def write_joint_positions(
        self,
        positions: list[float],
        velocity: float = 1.0,
    ) -> bool:
        """Command joint positions in radians.

        Args:
            positions: Target positions in radians.
            velocity: Speed as fraction of max (0-1).

        Convert from radians to SDK units before sending.
        """
        if not self._sdk:
            return False
        sdk_positions = [math.degrees(p) for p in positions]
        return self._sdk.set_joint_positions(sdk_positions)

    def write_joint_velocities(self, velocities: list[float]) -> bool:
        """Command joint velocities in rad/s.

        Return False if velocity control is not supported.
        """
        if not self._sdk:
            return False
        sdk_velocities = [math.degrees(v) for v in velocities]
        return self._sdk.set_joint_velocities(sdk_velocities)

    def write_stop(self) -> bool:
        """Stop all motion immediately."""
        if not self._sdk:
            return False
        return self._sdk.emergency_stop()


    def write_enable(self, enable: bool) -> bool:
        """Enable or disable servos."""
        if not self._sdk:
            return False
        return self._sdk.enable_motors(enable)

    def read_enabled(self) -> bool:
        """Check if servos are enabled."""
        if not self._sdk:
            return False
        return self._sdk.motors_enabled()

    def write_clear_errors(self) -> bool:
        """Clear error state."""
        if not self._sdk:
            return False
        return self._sdk.clear_errors()


    def read_cartesian_position(self) -> dict[str, float] | None:
        """Read end-effector pose.

        Returns dict with keys: x, y, z (meters), roll, pitch, yaw (radians).
        Return None if not supported.
        """
        return None  # Or implement if your SDK supports it

    def write_cartesian_position(
        self,
        pose: dict[str, float],
        velocity: float = 1.0,
    ) -> bool:
        """Command end-effector pose. Return False if not supported."""
        return False


    def read_gripper_position(self) -> float | None:
        """Read gripper position in meters. Return None if no gripper."""
        return None

    def write_gripper_position(self, position: float) -> bool:
        """Command gripper position in meters. Return False if no gripper."""
        return False


    def read_force_torque(self) -> list[float] | None:
        """Read F/T sensor data [fx, fy, fz, tx, ty, tz]. None if no sensor."""
        return None

__all__ = ["YourArmAdapter"]

Key implementation notes

  • Unsupported features — Return None for reads and False for writes. Never raise exceptions for optional features.
  • Velocity/effort feedback — If your SDK doesn’t provide these, return zeros. The coordinator handles this gracefully.
  • Lazy SDK import — If the vendor SDK is an optional dependency, you can import it inside connect() instead of at module level (see Piper adapter for this pattern):
    def connect(self) -> bool:
        try:
            from yourarm_sdk import YourArmSDK
            self._sdk = YourArmSDK(self._address)
            ...
        except ImportError:
            print("ERROR: yourarm-sdk not installed. Run: pip install yourarm-sdk")
            return False
    

Step 2: Create Package Files

__init__.py

skip
"""YourArm manipulator hardware adapter.

Usage:
    >>> from dimos.hardware.manipulators.yourarm import YourArmAdapter
    >>> adapter = YourArmAdapter(address="192.168.1.100", dof=6)
    >>> adapter.connect()
    >>> positions = adapter.read_joint_positions()
"""

from dimos.hardware.manipulators.yourarm.adapter import YourArmAdapter

__all__ = ["YourArmAdapter"]

__registry__.py

Create a lightweight manifest next to adapter.py:
skip
ADAPTER_FACTORIES = {
    "yourarm": "dimos.hardware.manipulators.yourarm.adapter:YourArmAdapter",
}

__all__ = ["ADAPTER_FACTORIES"]

How auto-discovery works

The AdapterRegistry in dimos/hardware/manipulators/registry.py automatically discovers your adapter metadata at import time:
  1. It iterates over all subpackages under dimos/hardware/manipulators/
  2. For each subpackage, it imports <subpackage>.__registry__
  3. It reads ADAPTER_FACTORIES entries such as "yourarm": "dimos.hardware.manipulators.yourarm.adapter:YourArmAdapter"
This means no central registry edit is needed. The adapter implementation module is imported lazily only when the adapter is selected through adapter_registry.create("yourarm", ...), so unrelated adapter listing does not import your vendor SDK. You can verify discovery works:
skip
from dimos.hardware.manipulators.registry import adapter_registry
print(adapter_registry.available())  # Should include "yourarm"

Step 3: Create Your Robot Folder and Blueprints

Each robot in DimOS gets its own folder under dimos/robot/. This is where you define all blueprints for your arm — coordinator, planning, perception, etc. This follows the same pattern as Unitree robots (dimos/robot/unitree/).

3a. Create the robot directory

dimos/robot/
├── unitree/                 # Unitree robots (reference example)
│   ├── go2/
│   │   └── blueprints/
│   └── g1/
│       └── blueprints/
└── yourarm/                 # ← New directory for your robot
    ├── __init__.py
    └── blueprints.py

3b. Define your blueprints

Create dimos/robot/yourarm/blueprints.py with your coordinator and (optionally) planning blueprints:
skip
"""Blueprints for YourArm robot.

Usage:
    # Run via CLI:
    dimos run coordinator-yourarm          # Start coordinator with real hardware
    dimos run yourarm-planner              # Start planner (optional, for motion planning)

    # Or programmatically:
    from dimos.robot.yourarm.blueprints import coordinator_yourarm
    coordinator = coordinator_yourarm.build()
    coordinator.loop()
"""

from __future__ import annotations

from pathlib import Path

from dimos.control.components import HardwareComponent, HardwareType, make_joints
from dimos.control.coordinator import ControlCoordinator, TaskConfig


# YourArm (6-DOF) — real hardware
coordinator_yourarm = ControlCoordinator.blueprint(
    tick_rate=100.0,                    # Control loop frequency (Hz)
    publish_joint_state=True,           # Publish aggregated joint state
    joint_state_frame_id="coordinator",
    hardware=[
        HardwareComponent(
            hardware_id="arm",                        # Unique ID for this hardware
            hardware_type=HardwareType.MANIPULATOR,
            joints=make_joints("arm", 6),             # Creates ["arm_joint1", ..., "arm_joint6"]
            adapter_type="yourarm",                   # Must match registry name
            address="192.168.1.100",                  # Passed to adapter __init__
            auto_enable=True,                         # Auto-enable servos on start
        ),
    ],
    tasks=[
        TaskConfig(
            name="traj_arm",                          # Task name (used by ManipulationModule RPC)
            type="trajectory",                        # Trajectory execution task
            joint_names=[f"arm_joint{i+1}" for i in range(6)],
            priority=10,                              # Higher priority wins arbitration
        ),
    ],
)


Blueprint field reference

FieldDescription
hardware_idUnique name for this hardware component. Used to route commands.
adapter_typeName registered with adapter_registry (e.g., "yourarm").
addressConnection info passed to adapter’s __init__ as address kwarg.
jointsList of joint names. make_joints("arm", 6) creates ["arm_joint1", ..., "arm_joint6"].
auto_enableIf True, servos are enabled automatically when the coordinator starts.
task.nameName used by the ManipulationModule to invoke trajectory execution via RPC.
task.typeTask type: "trajectory", "servo", "velocity", or "cartesian_ik".
task.priorityPriority for per-joint arbitration. Higher number wins.

Step 4: Add URDF and Planning Integration (Optional)

If you want motion planning (collision-free trajectories via Drake), you need a URDF and a planning blueprint. Add these to your robot’s own blueprints.py.

4a. Add your URDF

Place your URDF/xacro files under LFS data so they can be resolved via LfsPath. LfsPath is a Path subclass that lazily downloads LFS data on first access — this avoids downloading at import time when the blueprint module is loaded.
skip
from dimos.utils.data import LfsPath
from dimos.manipulation.manipulation_module import manipulation_module
from dimos.manipulation.planning.spec import RobotModelConfig
from dimos.manipulation.planning.spec.models import PlanningGroupDefinition
from dimos.msgs.geometry_msgs.PoseStamped import PoseStamped
from dimos.msgs.geometry_msgs.Quaternion import Quaternion
from dimos.msgs.geometry_msgs.Vector3 import Vector3

# LfsPath defers download until the path is actually accessed
_YOURARM_URDF_PATH = LfsPath("yourarm_description/urdf/yourarm.urdf")
_YOURARM_PACKAGE_PATH = LfsPath("yourarm_description")


def _make_base_pose(x=0.0, y=0.0, z=0.0) -> PoseStamped:
    return PoseStamped(
        frame_id="map",
        position=Vector3(x=x, y=y, z=z),
        orientation=Quaternion(0.0, 0.0, 0.0, 1.0),
    )

4b. Create a robot model config helper

skip
def _make_yourarm_config(
    name: str = "arm",
    y_offset: float = 0.0,
    coordinator_task: str | None = None,
) -> RobotModelConfig:
    """Create YourArm robot config for planning.

    Args:
        name: Robot name in the Drake planning world.
        y_offset: Y-axis offset for multi-arm setups.
        coordinator_task: Coordinator task name for trajectory execution via RPC.
    """
    # These must match the joint names in your URDF
    joint_names = ["joint1", "joint2", "joint3", "joint4", "joint5", "joint6"]

    return RobotModelConfig(
        name=name,
        model_path=_YOURARM_URDF_PATH,
        joint_names=joint_names,
        planning_groups=[
            PlanningGroupDefinition(
                name="manipulator",
                joint_names=tuple(joint_names),
                base_link="base_link",
                tip_link="link6",
                source="fallback",
            )
        ],
        base_pose=_make_base_pose(y=y_offset),  # Compatibility; prefer model placement
        base_link="base_link",                 # Compatibility robot-scoped base
        package_paths={"yourarm_description": _YOURARM_PACKAGE_PATH},
        xacro_args={},                  # Xacro arguments if using .xacro files
        collision_exclusion_pairs=[],   # Pairs of links that can touch (e.g., gripper fingers)
        auto_convert_meshes=True,       # Convert DAE/STL meshes for Drake
        max_velocity=1.0,               # Max velocity scaling factor
        max_acceleration=2.0,           # Max acceleration scaling factor
        coordinator_task_name=coordinator_task,
    )

4c. Create a planning blueprint

Add this to your dimos/robot/yourarm/blueprints.py alongside the coordinator blueprint:
skip

yourarm_planner = manipulation_module(
    robots=[_make_yourarm_config("arm", coordinator_task="traj_arm")],
    planning_timeout=10.0,
    visualization={"backend": "meshcat"},
)
# The planner's `coordinator_joint_state` input auto-connects to the
# ControlCoordinator's output on the default `/coordinator_joint_state`
# topic, so no `.transports(...)` override is needed.

Key config fields

FieldDescription
model_pathPath to .urdf or .xacro file
joint_namesOrdered controllable local model joint set (must match URDF); not itself a planning group
planning_groups / srdf_pathExplicit planning groups or SRDF source; fallback can generate {robot_name}/manipulator for an unambiguous single chain
package_pathsMaps package:// URIs to filesystem paths (for xacro)
coordinator_task_nameMust match the TaskConfig.name in your coordinator blueprint
collision_exclusion_pairsList of (link_a, link_b) tuples for links that may legitimately touch (e.g., gripper fingers)
Coordinator-facing joint states and trajectories use global joint names derived mechanically as {robot_name}/{local_joint_name} (for example, arm/joint1). Keep hardware-native name translation inside the hardware adapter; manipulation planning config uses local model joint names. base_link, base_pose, and end_effector_link are compatibility fields used by current placement and robot-scoped helper paths. New planning code should use SRDF/planning-group chain base/tip links and encode robot placement in the model. See Planning Groups.

Step 5: Register Blueprints

The blueprint registry in dimos/robot/all_blueprints.py is auto-generated by scanning the codebase for blueprint declarations. After adding your blueprints:
  1. Run the generation test to update the registry:
    pytest dimos/robot/test_all_blueprints_generation.py
    
  2. Now you can run your arm via CLI:
    dimos run coordinator-yourarm
    dimos run yourarm-planner        # If you added a planning blueprint
    

Step 6: Testing

Verify adapter registration

skip
from dimos.hardware.manipulators.registry import adapter_registry

# Check your adapter shows up
assert "yourarm" in adapter_registry.available()

# Create an instance via registry (same path the coordinator uses)
adapter = adapter_registry.create("yourarm", address="192.168.1.100", dof=6)

Unit test with mock

You can test coordinator logic without hardware by using unittest.mock:
skip
import pytest
from unittest.mock import MagicMock
from dimos.hardware.manipulators.spec import ManipulatorAdapter

@pytest.fixture
def mock_adapter():
    adapter = MagicMock(spec=ManipulatorAdapter)
    adapter.get_dof.return_value = 6
    adapter.read_joint_positions.return_value = [0.0] * 6
    adapter.read_joint_velocities.return_value = [0.0] * 6
    adapter.read_joint_efforts.return_value = [0.0] * 6
    adapter.write_joint_positions.return_value = True
    adapter.activate.return_value = True
    adapter.deactivate.return_value = True
    adapter.read_enabled.return_value = True
    adapter.is_connected.return_value = True
    return adapter

def test_read_positions(mock_adapter):
    assert mock_adapter.read_joint_positions() == [0.0] * 6

def test_write_positions(mock_adapter):
    target = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6]
    assert mock_adapter.write_joint_positions(target) is True

Integration test with coordinator

skip
from dimos.robot.manipulators.common.mock import coordinator_mock
from dimos.core.coordination.module_coordinator import ModuleCoordinator

# Build and start coordinator with mock hardware
coordinator = ModuleCoordinator.build(coordinator_mock)
coordinator.start()

# Your adapter is tested through the same coordinator interface
# Just swap adapter_type="mock" to adapter_type="yourarm" in a blueprint

coordinator.stop()

Test the real adapter standalone

skip
from dimos.hardware.manipulators.yourarm import YourArmAdapter

adapter = YourArmAdapter(address="192.168.1.100", dof=6)
assert adapter.connect() is True
assert adapter.is_connected() is True

# Read state
positions = adapter.read_joint_positions()
assert len(positions) == 6
print(f"Joint positions (rad): {positions}")

# Activate and move
adapter.activate()
adapter.write_joint_positions([0.0] * 6)

# Cleanup
adapter.deactivate()
adapter.disconnect()

Quick Reference Checklist

Files to create:
  • dimos/hardware/manipulators/yourarm/__init__.py
  • dimos/hardware/manipulators/yourarm/__registry__.py (maps adapter key to implementation path)
  • dimos/hardware/manipulators/yourarm/adapter.py (implements Protocol)
  • dimos/robot/yourarm/__init__.py
  • dimos/robot/yourarm/blueprints.py (coordinator + planning blueprints)
Files to modify:
  • pyproject.toml — Add vendor SDK to optional dependencies (if applicable)
Verification:
  • adapter_registry.available() includes "yourarm"
  • adapter_registry.create("yourarm", address="192.168.1.100", dof=6) returns your adapter
  • pytest dimos/robot/test_all_blueprints_generation.py passes (regenerates all_blueprints.py)
  • dimos run coordinator-yourarm starts successfully