Skip to content

Signal manager

signal_manager

SignalManager dataclass

Python
SignalManager(
    export_path: Path,
    batch_size: int = 1000,
    record_decimation: int = 1,
    _buffer_row_idx: int = 0,
    _step_count: int = -1,
    _n_cols: int = 0,
)

export_path instance-attribute

Python
export_path: Path

Where the output file should be saved.

batch_size class-attribute instance-attribute

Python
batch_size: int = 1000

Number of steps before flushing to disk.

record_decimation class-attribute instance-attribute

Python
record_decimation: int = 1

How many steps between each recording should be performed.

post

Python
post(
    value: float,
    category: SignalCategory | str,
    subgroups: tuple[str, ...] = (),
    *,
    attr: str | None = None,
)

Injects a value into the telemetry ledger using a hierarchical namespace.

This method constructs a structured key that the dashboard uses to build a navigable tree view. The naming convention follows a folder-like structure to group related signals (e.g., all axes of a body's position).

Format

Category/Subgroup:Attribute (e.g., "Bodies/Link_1:xpos_x")

Parameters:

Name Type Description Default
value float

The numeric data to record.

required
category SignalCategory | str

Top level category (e.g., "Bodies")

required
subgroups tuple[str, ...]

The second-level organizational folders. Defaults to an empty tuple.

()
attr str | None

The specific signal or component name (e.g., "qpos" or "x"). Defaults to None.

None

Examples:

Python Console Session
>>> # Becomes "Bodies/Hand/xpos:x"
>>> manager.post(1.2, SignalCategory.BODIES, ("Hand", "xpos"), "x")
Python Console Session
>>> # Becomes "Sensors/IMU/Accel:z"
>>> manager.post(9.81, "Sensors", ("IMU", "Accel"), attr="z")
Source code in src/mujoco_mojo/runtime/signal_manager.py
Python
def post(
    self,
    value: float,
    category: SignalCategory | str,
    subgroups: tuple[str, ...] = (),
    *,
    attr: str | None = None,
):
    """
    Injects a value into the telemetry ledger using a hierarchical namespace.

    This method constructs a structured key that the dashboard uses to build a navigable tree view. The naming convention follows a folder-like structure to group related signals (e.g., all axes of a body's position).

    Format:
        Category/Subgroup:Attribute
        (e.g., "Bodies/Link_1:xpos_x")

    Args:
        value (float): The numeric data to record.
        category (SignalCategory | str): Top level category (e.g., "Bodies")
        subgroups (tuple[str, ...], optional): The second-level organizational folders. Defaults to an empty tuple.
        attr (str | None, optional): The specific signal or component name (e.g., "qpos" or "x"). Defaults to None.

    Examples:
        >>> # Becomes "Bodies/Hand/xpos:x"
        >>> manager.post(1.2, SignalCategory.BODIES, ("Hand", "xpos"), "x")

        >>> # Becomes "Sensors/IMU/Accel:z"
        >>> manager.post(9.81, "Sensors", ("IMU", "Accel"), attr="z")

    """
    # use tuple as cache key to avoid string construction
    cache_lookup = (str(category), subgroups, attr if attr is not None else "")

    if cache_lookup in self._key_cache:
        # fast path for cached signal
        full_key = self._key_cache[cache_lookup]
    else:
        # slow path for a new signal
        path_parts = [str(category)] + [str(s) for s in subgroups if s]
        full_key = "/".join(path_parts)
        if attr:
            full_key += f":{attr}"

        self._key_cache[cache_lookup] = full_key

    # get column index
    if full_key in self._key_to_idx:
        idx = self._key_to_idx[full_key]
    else:
        # register a new signal column
        idx = self._n_cols
        self._key_to_idx[full_key] = idx
        self._n_cols += 1

        logger.debug(f"New signal registered: {full_key} at index {idx}")

        # grow buffer if exceeding the initial guess
        if self._n_cols > self._data_buffer.shape[1]:
            n_cols_to_add = 50
            new_width = self._data_buffer.shape[1] + n_cols_to_add
            logger.debug(f"Growing telemetry buffer width to {new_width} columns.")

            growth = np.zeros((self.batch_size, n_cols_to_add), dtype=np.float64)
            self._data_buffer = np.hstack([self._data_buffer, growth])

    # write value to buffer for next flush
    self._data_buffer[self._buffer_row_idx, idx] = value

record

Python
record(mj_model: MjModel, mj_data: MjData)

Executes all samplers and advances the buffer index. Flushes if due.

Source code in src/mujoco_mojo/runtime/signal_manager.py
Python
def record(self, mj_model: mujoco.MjModel, mj_data: mujoco.MjData):
    """Executes all samplers and advances the buffer index. Flushes if due."""
    self._step_count += 1
    if self._step_count % self.record_decimation != 0:
        return

    # record simulation time
    self._data_buffer[self._buffer_row_idx, 0] = mj_data.time

    # run samplers
    for task in self._sample_tasks:
        task(mj_model, mj_data)

    self._buffer_row_idx += 1

    if self._buffer_row_idx >= self.batch_size:
        self.flush()

flush

Python
flush()

Commits the memory buffer to the output file.

Source code in src/mujoco_mojo/runtime/signal_manager.py
Python
def flush(self):
    """Commits the memory buffer to the output file."""
    if self._buffer_row_idx == 0:
        return

    # build column names from mapping
    sorted_keys = sorted(self._key_to_idx.keys(), key=lambda x: self._key_to_idx[x])

    # slice only the used portion of the buffer
    new_df = pl.from_numpy(
        data=self._data_buffer[: self._buffer_row_idx, : self._n_cols],
        schema=sorted_keys,
    )

    logger.debug(
        f"Flushing {self._buffer_row_idx} steps to {self.export_path.name}"
    )

    if self.export_path.exists():
        try:
            # Use diagonal concat to safely handle signals added mid-simulation
            existing_df = pl.read_parquet(self.export_path)
            combined_df = pl.concat([existing_df, new_df], how="diagonal")
            combined_df.write_parquet(self.export_path, compression="zstd")
        except Exception as e:
            logger.error(f"Failed to append telemetry: {e}")
    else:
        new_df.write_parquet(self.export_path, compression="zstd")

    # reset buffer for next batch
    self._buffer_row_idx = 0
    self._data_buffer.fill(0.0)