> ## Documentation Index
> Fetch the complete documentation index at: https://dimensional-cc-frontier.mintlify.site/llms.txt
> Use this file to discover all available pages before exploring further.

# Modules

# DimOS Modules

Modules are subsystems on a robot that operate autonomously and communicate with other subsystems using standardized messages.

Some examples of modules are:

* Webcam (outputs image)
* Navigation (inputs a map and a target, outputs a path)
* Detection (takes an image and a vision model like YOLO, outputs a stream of detections)

Below is an example of a structure for controlling a robot. Black blocks represent modules, and colored lines are connections and message types. It's okay if this doesn't make sense now. It will by the end of this document.

> **Prerequisite:** Blueprint visualization (both SVG export and the Rerun Graph tab) requires Graphviz:
>
> ```bash theme={null}
> sudo apt install graphviz   # Ubuntu/Debian
> brew install graphviz        # macOS
> ```

```python skip output=assets/go2_nav.svg theme={null}
from dimos.core.introspection.svg import to_svg
from dimos.robot.unitree.go2.blueprints.smart.unitree_go2 import unitree_go2

to_svg(unitree_go2, "assets/go2_nav.svg")
```

<img src="https://mintcdn.com/dimensional-cc-frontier/yrpnlbymEbdn9xV3/usage/assets/go2_nav.svg?fit=max&auto=format&n=yrpnlbymEbdn9xV3&q=85&s=e4871de3b5f4897a0f6a2dc2d84ee21d" alt="output" width="1319" height="251" data-path="usage/assets/go2_nav.svg" />

## Camera Module

Let's learn how to build stuff like the above, starting with a simple camera module.

```python skip session=camera_module_demo output=assets/camera_module.svg theme={null}
from dimos.hardware.sensors.camera.module import CameraModule
from dimos.core.introspection.svg import to_svg
to_svg(CameraModule.module_info(), "assets/camera_module.svg")
```

We can also print Module I/O quickly to the console via the `.io()` call. We will do this from now on.

```python session=camera_module_demo ansi=false theme={null}
from dimos.hardware.sensors.camera.module import CameraModule

print(CameraModule.io())
```

```results theme={null}
┌┴─────────────┐
│ CameraModule │
└┬─────────────┘
 ├─ color_image: Image
 ├─ camera_info: CameraInfo
 │
 ├─ RPC build() -> None
 ├─ RPC get_skills() -> list
 ├─ RPC set_module_ref(name: str, module_ref: RPCClient) -> None
 ├─ RPC set_transport(stream_name: str, transport: Transport) -> bool
 ├─ RPC start() -> None
 ├─ RPC stop() -> None
 ├─ RPC take_a_picture() -> Image
```

We can see that the camera module outputs two streams:

* `color_image` with [sensor\_msgs.Image](https://docs.ros.org/en/melodic/api/sensor_msgs/html/msg/Image.html) type
* `camera_info` with [sensor\_msgs.CameraInfo](https://docs.ros.org/en/melodic/api/sensor_msgs/html/msg/CameraInfo.html) type

It offers two RPC calls: `start()` and `stop()` (lifecycle methods).

It also exposes an agentic [skill](/docs/usage/blueprints.md#defining-skills) called `take_a_picture` (more on skills in the Blueprints guide).

We can start this module and explore the output of its streams in real time (this will use your webcam).

```python skip session=camera_module_demo ansi=false theme={null}
import time

camera = CameraModule()
camera.start()
# Now this module runs in our main loop in a thread. We can observe its outputs.

print(camera.color_image)

camera.color_image.subscribe(print)
time.sleep(0.5)
camera.stop()
```

```results theme={null}
Out color_image[Image] @ CameraModule
Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2025-12-31 15:54:16)
Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2025-12-31 15:54:16)
Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2025-12-31 15:54:17)
Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2025-12-31 15:54:17)
Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2025-12-31 15:54:17)
Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2025-12-31 15:54:17)
Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2025-12-31 15:54:17)
Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2025-12-31 15:54:17)
Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2025-12-31 15:54:17)
Image(shape=(480, 640, 3), format=RGB, dtype=uint8, dev=cpu, ts=2025-12-31 15:54:17)
```

## Connecting modules

Let's load a standard 2D detector module and hook it up to a camera.

```python skip ansi=false session=detection_module theme={null}
from dimos.perception.detection.module2D import Detection2DModule, Config
print(Detection2DModule.io())
```

```results theme={null}
 ├─ color_image: Image
┌┴──────────────────┐
│ Detection2DModule │
└┬──────────────────┘
 ├─ detections: Detection2DArray
 ├─ annotations: ImageAnnotations
 ├─ detected_image_0: Image
 ├─ detected_image_1: Image
 ├─ detected_image_2: Image
 │
 ├─ RPC build() -> None
 ├─ RPC get_skills() -> list
 ├─ RPC set_module_ref(name: str, module_ref: RPCClient) -> None
 ├─ RPC set_transport(stream_name: str, transport: Transport) -> bool
 ├─ RPC start() -> None
 ├─ RPC stop() -> None
```

Looks like the detector just needs an image input and outputs some sort of detection and annotation messages. Let's connect it to a camera.

```python skip ansi=false theme={null}
import time
from dimos.perception.detection.module2D import Detection2DModule, Config
from dimos.hardware.sensors.camera.module import CameraModule

camera = CameraModule()
detector = Detection2DModule()

detector.image.connect(camera.color_image)

camera.start()
detector.start()

detector.detections.subscribe(print)
time.sleep(3)
detector.stop()
camera.stop()
```

## Distributed Execution

As we build module structures, we'll quickly want to utilize all cores on the machine (which Python doesn't allow as a single process) and potentially distribute modules across machines or even the internet.

For this, we use `dimos.core` and DimOS transport protocols.

Defining message exchange protocols and message types also gives us the ability to write models in faster languages.

### Dedicated workers

By default the coordinator assigns modules to worker processes by least-load, so multiple modules share a worker. Heavy modules (robot connections, voxel mappers) should run alone so they don't contend with anything else for CPU or the GIL. Set `dedicated_worker = True` on the class and the coordinator will give that module a worker process to itself.

```python theme={null}
from dimos.core.module import Module


class HeavyModule(Module):
    dedicated_worker = True
```

If declaring dedicated modules would push the pool past half-dedicated, the coordinator auto-grows it so non-dedicated workers always at least match the dedicated count.

## Sync input handlers

If you don't need an asyncio loop, subscribe to your `In[T]` streams from `start()` and register the unsubscribe with `register_disposable` so cleanup happens automatically at `stop()`.

```python theme={null}
from reactivex.disposable import Disposable

from dimos.core.core import rpc
from dimos.core.module import Module
from dimos.core.stream import In
from dimos.msgs.std_msgs.Int32 import Int32


class Counter(Module):
    value: In[Int32]

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._total = 0

    @rpc
    def start(self) -> None:
        super().start()
        self.register_disposable(Disposable(self.value.subscribe(self._on_value)))

    def _on_value(self, msg: Int32) -> None:
        self._total += msg.data
```

`In.subscribe(cb)` returns an *unsubscribe function*, not a `DisposableBase`. Wrap it in `Disposable(...)` so `register_disposable` can dispose it on `stop()`. Without this, your handler keeps running after `stop()` and tests will fail thread-leak checks.

The callback runs on whatever thread emits the message, so guard mutable state with a lock if multiple inputs share it.

## Triggering side effects via Specs

A common pattern is "subscribe to a stream, react by calling another module". Declare the other module's protocol as a `Spec` field (single-underscore, private). The coordinator binds the proxy at deploy time, so handlers can call it directly with no extra wiring:

```python theme={null}
from typing import Protocol

from reactivex.disposable import Disposable

from dimos.core.core import rpc
from dimos.core.module import Module
from dimos.core.stream import In
from dimos.msgs.std_msgs.Int32 import Int32
from dimos.spec.utils import Spec


class NotifierSpec(Spec, Protocol):
    def notify(self, text: str) -> None: ...


class Watchdog(Module):
    value: In[Int32]

    _notifier: NotifierSpec

    @rpc
    def start(self) -> None:
        super().start()
        self.register_disposable(Disposable(self.value.subscribe(self._on_value)))

    def _on_value(self, msg: Int32) -> None:
        if msg.data > 100:
            self._notifier.notify(f"value={msg.data}")
```

The Spec must match the target module's `@rpc` signatures (sync/async are interchangeable — see [Async modules](#async-modules-lock-free-state)).

To deploy `Watchdog`, add `Watchdog.blueprint()` to an existing blueprint's `autoconnect(...)` chain. The coordinator matches `Out[T]` to `In[T]` by name across the union of modules, and resolves `_notifier: NotifierSpec` to whichever module in the blueprint implements `notify`. No manual wiring required.

## Testing modules

Mock spec dependencies (anything typed `: SomeSpec`) after construction, since the framework normally wires them at deploy time:

```python skip theme={null}
@pytest.fixture()
def module(mocker):
    m = MyModule(step=10)
    m._speak_skill = mocker.MagicMock()
    yield m
    m.stop()  # required: cleans up the per-instance asyncio loop and thread
```

The `m.stop()` in teardown matters. The test session-wide thread-leak detector will fail the test otherwise, even if your test body never started any threads.

## Restarting a module

While iterating on a module it's often convenient to edit its source file
and pick up the changes without tearing down the whole coordinator. The
`restart_module` call stops a single deployed module, reloads its source
via `importlib.reload`, then redeploys it onto a fresh worker process while
keeping its stream transports and reconnecting any other modules that held
a reference to it.

```python skip theme={null}
from dimos.core.coordination.module_coordinator import ModuleCoordinator
from dimos.core.global_config import GlobalConfig
from dimos.hardware.sensors.camera.module import CameraModule

coordinator = ModuleCoordinator(g=GlobalConfig(n_workers=0, viewer="none"))
coordinator.start()
coordinator.load_module(CameraModule)

# ... edit CameraModule source on disk ...

coordinator.restart_module(CameraModule)
```

## Async modules (lock-free state)

Modules contain a per-instance asyncio loop on a daemon thread (`self._loop`). It is possible to write modules using only `async def` methods so that everything runs on the same thread and you don't need to use locks. The module's auto-bound input handlers, async `@rpc` methods, and `process_observable` callbacks all run on `self._loop`, and each handler subscription is serialized through a dedicated dispatcher task.

### Auto-bound input handlers

For every declared `x: In[T]`, if the module defines `async def handle_x(self, msg: T)`, the handler is automatically subscribed at `start()` and dispatched onto `self._loop`. Subscriptions are cleaned up at `stop()`.

```python theme={null}
from dimos.core.module import Module
from dimos.core.stream import In, Out
from dimos.msgs.geometry_msgs.PointStamped import PointStamped
from dimos.msgs.geometry_msgs.Twist import Twist

class MovementManager(Module):
    clicked_point: In[PointStamped]
    nav_cmd_vel: In[Twist]
    tele_cmd_vel: In[Twist]

    cmd_vel: Out[Twist]
    goal: Out[PointStamped]

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        # No lock needed. `_teleop_active` is only mutated on `self._loop`.
        self._teleop_active = False

    async def handle_clicked_point(self, msg: PointStamped) -> None:
        self.goal.publish(msg)

    async def handle_nav_cmd_vel(self, msg: Twist) -> None:
        if not self._teleop_active:
            self.cmd_vel.publish(msg)

    async def handle_tele_cmd_vel(self, msg: Twist) -> None:
        self._teleop_active = True
        self.cmd_vel.publish(msg)
```

Each handler runs in a per-handler dispatcher task on `self._loop`. Handlers are serialized: only one invocation of `handle_x` runs at a time. If messages arrive faster than the handler can process them, intermediate messages are dropped — only the most recent unprocessed message is kept (LATEST policy). The handler is guaranteed to eventually run with the most recently published value.

### Async `@rpc` methods

`@rpc` works on both sync and `async def` methods. When applied to an async method, the call site dispatches automatically:

* From another thread (the RPC dispatcher, sync test code, a sync `@rpc` on the same module), the call blocks until the coroutine completes on `self._loop`.
* From inside the loop (another async `@rpc`, a `handle_*`, or a `process_observable` callback), it returns the coroutine so the caller can `await` it.

```python theme={null}
from dimos.core.core import rpc
from dimos.core.module import Module

class NameModule(Module):
    @rpc
    async def say_hello(self, name: str) -> str:
        return f"Hello {name}, from {self._my_name}"

    @rpc
    async def set_my_name(self, new_name: str) -> None:
        self._my_name = new_name
```

Async and sync `@rpc` methods are interchangeable for cross-module linking. Both are discovered via `Module.rpcs` and served through the same RPC machinery. A module ref or RPC client doesn't care whether the underlying method is sync or async.

When the consumer types a module ref using a Spec that declares `async def`, the proxy automatically exposes those methods as awaitables: `await self._name_module.say_hello(name)`.

```python theme={null}
from typing import Protocol

from dimos.core.module import Module
from dimos.spec.utils import Spec

class NameSpec(Spec, Protocol):
    async def say_hello(self, name: str) -> str: ...
    async def set_my_name(self, new_name: str) -> None: ...

class StartModule(Module):
    _name_module: NameSpec

    async def code():
        await self._name_module.set_my_name("John")
        print(await self._name_module.say_hello("Bill"))
```

`NameModule` is async. But if you need to call it from a sync module, you just need to create a `SyncNameSpec`:

```python theme={null}
from typing import Protocol

from dimos.spec.utils import Spec

class SyncNameSpec(Spec, Protocol):
    def say_hello(self, name: str) -> str: ...
    def set_my_name(self, new_name: str) -> None: ...
```

This will match with `NameModule`. You can call it synchronously from your module, but it will run in the `self._loop` async loop in the `NameModule` module.

The reverse is also true: you can call a sync module from async code.

### `spawn`: schedule a long-running coroutine from sync code

When you need to start a long-running async task from `start()` (e.g., a timer loop), use `self.spawn(coro)` instead of `asyncio.run_coroutine_threadsafe(coro, self._loop)`. The helper wires up a done-callback that surfaces unhandled exceptions to the module logger. bare `run_coroutine_threadsafe` silently stores the exception on the returned Future, where it disappears unless the user remembers to read `.result()`.

```python theme={null}
import asyncio

from dimos.core.core import rpc
from dimos.core.module import Module

class TimerExample(Module):
    @rpc
    def start(self) -> None:
        super().start()
        self._timer_future = self.spawn(self._timer_loop())

    async def _timer_loop(self) -> None:
        while True:
            await asyncio.sleep(1.0)
            ...

    @rpc
    def stop(self) -> None:
        if self._timer_future is not None:
            self._timer_future.cancel()
        super().stop()
```

### `process_observable`: async subscriptions to arbitrary observables

Sometimes you have rxpy observables which you need to run inside `self._loop`. You can do this with `self.process_observable(observable, async_handler)` .

```python skip theme={null}
@rpc
def start(self) -> None:
    super().start()
    fast = self.foo.observable().pipe(ops.filter(lambda v: v > threshold))
    self.process_observable(fast, self._on_fast_foo)

async def _on_fast_foo(self, v: int) -> None:
    ...
```

### `main()`: combined setup/teardown

When a module owns a resource that needs construction at startup *and* explicit cleanup at shutdown, define `async def main(self)` as an **async generator with exactly one `yield`**. Code before `yield` runs at `start()`, code after `yield` runs at `stop()`.

```python theme={null}
from collections.abc import AsyncIterator
from typing import Any

from dimos.core.module import Module

def create(name: str) -> Any:
    del name
    class _Model:
        def stop(self) -> None:
            pass

    return _Model()

class PersonFollowSkillContainer(Module):
    async def main(self) -> AsyncIterator[None]:
        # setup
        self._vl_model = create("qwen")

        yield

        # teardown
        self._vl_model.stop()
```

Compared to splitting the same work across `__init__` / `start()` / `stop()`, `main()` keeps the construction-and-destruction of each resource visually adjacent.

## Blueprints

A blueprint is a predefined structure of interconnected modules. You can include blueprints or modules in new blueprints.

A basic Unitree Go2 blueprint looks like what we saw before.

```python skip session=blueprints output=assets/go2_agentic.svg theme={null}
from dimos.core.introspection.svg import to_svg
from dimos.robot.unitree_webrtc.unitree_go2_blueprints import agentic

to_svg(agentic, "assets/go2_agentic.svg")
```

<img src="https://mintcdn.com/dimensional-cc-frontier/yrpnlbymEbdn9xV3/usage/assets/go2_agentic.svg?fit=max&auto=format&n=yrpnlbymEbdn9xV3&q=85&s=2b78ccc60f9eb9cb57fd120215d0b23d" alt="output" width="1417" height="753" data-path="usage/assets/go2_agentic.svg" />

To see more information on how to use Blueprints, see [Blueprints](/docs/usage/blueprints.md).
