Skip to content

Index

runtime

GeneralLoad

Bases: VectorForce, VectorTorque

A 6-DOF force/torque applier.

name instance-attribute

Python
name: str

Name of the forcing function. Used in data output column naming.

active class-attribute instance-attribute

Python
active: bool = True

Whether or not this force should be active.

action_site instance-attribute

Python
action_site: AnySite

Site on which the forcing function acts.

rel_to_site class-attribute instance-attribute

Python
rel_to_site: AnySite | None = None

Frame of reference for the calculated force. If None, uses worldbody.

xtion_body class-attribute instance-attribute

Python
xtion_body: Body | None = None

Body on which the load should be acted on. If None the world will be used.

get_visuals

Python
get_visuals(
    mj_model: MjModel, mj_data: MjData
) -> list[ArrowConfig]

Returns a list of arrow configurations for the renderer.

Source code in src/mujoco_mojo/runtime/load.py
Python
def get_visuals(
    self, mj_model: mujoco.MjModel, mj_data: mujoco.MjData
) -> list[ArrowConfig]:
    """Returns a list of arrow configurations for the renderer."""
    if not self.active:
        return []

    visuals: list[ArrowConfig] = []
    action_pos = self.action_site.rt_pos(mj_model, mj_data)

    # force arrow
    f_vec = self._last_f[:3]
    if np.linalg.norm(f_vec) > 1e-4:
        visuals.append(
            ArrowConfig(
                pos=action_pos,
                vec=f_vec,
                color=Color.EMERALD_500.rgba,
                is_torque=False,
            )
        )

    t_vec = self._last_t[:3]
    if np.linalg.norm(t_vec) > 1e-4:
        visuals.append(
            ArrowConfig(
                pos=action_pos,
                vec=t_vec,
                color=Color.AMBER_500.rgba,
                is_torque=True,
            )
        )

    return visuals

Load

Bases: MojoBaseModel, ABC

Base class to build forcing functions off of.

name instance-attribute

Python
name: str

Name of the forcing function. Used in data output column naming.

active class-attribute instance-attribute

Python
active: bool = True

Whether or not this force should be active.

action_site instance-attribute

Python
action_site: AnySite

Site on which the forcing function acts.

rel_to_site class-attribute instance-attribute

Python
rel_to_site: AnySite | None = None

Frame of reference for the calculated force. If None, uses worldbody.

resolve_ids

Python
resolve_ids(mj_model: MjModel, mj_data: MjData)

Caches the integer IDs from the compiled MuJoCo model.

Source code in src/mujoco_mojo/runtime/load.py
Python
def resolve_ids(self, mj_model: mujoco.MjModel, mj_data: mujoco.MjData):
    """Caches the integer IDs from the compiled MuJoCo model."""
    self.action_site.get_id(mj_model)

    if self.rel_to_site:
        self.rel_to_site.get_id(mj_model)

calculate abstractmethod

Python
calculate(
    mj_model: MjModel, mj_data: MjData
) -> tuple[ndarray, ndarray]

Calculate the force for the timestep.

Parameters:

Name Type Description Default
mj_model MjModel

description

required
mj_data MjData

description

required

Returns:

Type Description
tuple[ndarray, ndarray]

tuple[np.ndarray, np.ndarray]: The force and toque vector output.

Source code in src/mujoco_mojo/runtime/load.py
Python
@abstractmethod
def calculate(
    self, mj_model: mujoco.MjModel, mj_data: mujoco.MjData
) -> tuple[np.ndarray, np.ndarray]:
    """
    Calculate the force for the timestep.

    Args:
        mj_model (mujoco.MjModel): _description_
        mj_data (mujoco.MjData): _description_

    Returns:
        tuple[np.ndarray, np.ndarray]: The force and toque vector output.

    """

get_visuals

Python
get_visuals(
    mj_model: MjModel, mj_data: MjData
) -> list[ArrowConfig]

Returns a list of arrow configurations for the renderer.

Source code in src/mujoco_mojo/runtime/load.py
Python
def get_visuals(
    self, mj_model: mujoco.MjModel, mj_data: mujoco.MjData
) -> list[ArrowConfig]:
    """Returns a list of arrow configurations for the renderer."""
    if not self.active:
        return []

    visuals: list[ArrowConfig] = []
    action_pos = self.action_site.rt_pos(mj_model, mj_data)

    # force arrow
    f_vec = self._last_f[:3]
    if np.linalg.norm(f_vec) > 1e-4:
        visuals.append(
            ArrowConfig(
                pos=action_pos,
                vec=f_vec,
                color=Color.EMERALD_500.rgba,
                is_torque=False,
            )
        )

    t_vec = self._last_t[:3]
    if np.linalg.norm(t_vec) > 1e-4:
        visuals.append(
            ArrowConfig(
                pos=action_pos,
                vec=t_vec,
                color=Color.AMBER_500.rgba,
                is_torque=True,
            )
        )

    return visuals

PointToPointForce

Bases: Load

Acts along the line-of-sight between two sites.

name instance-attribute

Python
name: str

Name of the forcing function. Used in data output column naming.

active class-attribute instance-attribute

Python
active: bool = True

Whether or not this force should be active.

action_site instance-attribute

Python
action_site: AnySite

Site on which the forcing function acts.

rel_to_site class-attribute instance-attribute

Python
rel_to_site: AnySite | None = None

Frame of reference for the calculated force. If None, uses worldbody.

xtion_site instance-attribute

Python
xtion_site: AnySite

Site on which the forcing function will apply a reation force. Leave as None to use the worldbody.

This is called xtion to limit confusion between "reaction" and "relative".

magnitude_func instance-attribute

Python
magnitude_func: Callable[
    [float, float, float, MjModel, MjData], float
]

Func(distance, velocity, initial distance, MjModel, MjData) -> scalar_force. Can be a regular function, lambda, etc.

resolve_ids

Python
resolve_ids(mj_model: MjModel, mj_data: MjData)

Caches the integer IDs from the compiled MuJoCo model.

Source code in src/mujoco_mojo/runtime/load.py
Python
def resolve_ids(self, mj_model: mujoco.MjModel, mj_data: mujoco.MjData):
    """Caches the integer IDs from the compiled MuJoCo model."""
    super().resolve_ids(mj_model, mj_data)
    self.xtion_site.get_id(mj_model)
    self._r0_mag = self.action_site.rt_dm(self.xtion_site, mj_model, mj_data)

get_visuals

Python
get_visuals(
    mj_model: MjModel, mj_data: MjData
) -> list[ArrowConfig]

Returns a list of arrow configurations for the renderer.

Source code in src/mujoco_mojo/runtime/load.py
Python
def get_visuals(
    self, mj_model: mujoco.MjModel, mj_data: mujoco.MjData
) -> list[ArrowConfig]:
    """Returns a list of arrow configurations for the renderer."""
    visuals = super().get_visuals(mj_model, mj_data)

    if not self.active:
        return []

    # Add the reaction force arrow at the xtion site
    xtion_pos = self.xtion_site.rt_pos(mj_model, mj_data)
    f_vec = self._last_f[:3]

    if np.linalg.norm(f_vec) > 1e-4:
        visuals.append(
            ArrowConfig(
                pos=xtion_pos,
                vec=-f_vec,  # opposite direction
                color=Color.ROSE_500.rgba,  # Red for Reaction
                is_torque=False,
            )
        )

    return visuals

ideal_spring classmethod

Python
ideal_spring(
    name: str,
    action_site: AnySite,
    xtion_site: AnySite,
    stiffness: float | NamedValue[float] = 0.0,
    damping: float | NamedValue[float] = 0.0,
    rest_length: float = 0.0,
) -> Self

Standard linear spring-damper (works in both tension and compression).

Source code in src/mujoco_mojo/runtime/load.py
Python
@classmethod
def ideal_spring(
    cls,
    name: str,
    action_site: AnySite,
    xtion_site: AnySite,
    stiffness: float | NamedValue[float] = 0.0,
    damping: float | NamedValue[float] = 0.0,
    rest_length: float = 0.0,
) -> Self:
    """Standard linear spring-damper (works in both tension and compression)."""

    def logic(
        d: float,
        v: float,
        r0: float,
        mj_model: mujoco.MjModel,
        mj_data: mujoco.MjData,
    ) -> float:
        return _ideal_force_logic(d, v, stiffness, damping, rest_length)

    return cls(
        name=name,
        action_site=action_site,
        xtion_site=xtion_site,
        magnitude_func=logic,
    )

stroke_compression_spring classmethod

Python
stroke_compression_spring(
    name: str,
    action_site: AnySite,
    xtion_site: AnySite,
    stiffness: float | NamedValue[float] = 0.0,
    damping: float | NamedValue[float] = 0.0,
    preload: float | NamedValue[float] = 0.0,
    max_stroke: float | NamedValue[float] = 0.1,
) -> Self

Creates a spring-damper that only acts when the runtime length is between rest_length and (rest_legnth + stroke_length)

Source code in src/mujoco_mojo/runtime/load.py
Python
@classmethod
def stroke_compression_spring(
    cls,
    name: str,
    action_site: AnySite,
    xtion_site: AnySite,
    stiffness: float | NamedValue[float] = 0.0,
    damping: float | NamedValue[float] = 0.0,
    preload: float | NamedValue[float] = 0.0,
    max_stroke: float | NamedValue[float] = 0.1,
) -> Self:
    """Creates a spring-damper that only acts when the runtime length is between rest_length and (rest_legnth + stroke_length)"""

    def logic(
        d: float,
        v: float,
        r0: float,
        mj_model: mujoco.MjModel,
        mj_data: mujoco.MjData,
    ) -> float:
        k = stiffness.value if isinstance(stiffness, NamedValue) else stiffness
        c = damping.value if isinstance(damping, NamedValue) else damping
        f_0 = preload.value if isinstance(preload, NamedValue) else preload
        d_f = max_stroke.value if isinstance(max_stroke, NamedValue) else max_stroke

        delta_d = d - r0

        if 0 <= delta_d <= d_f:
            f_mag = f_0 - (k * delta_d) - (c * v)
            return max(0.0, f_mag)
        return 0.0

    return cls(
        name=name,
        action_site=action_site,
        xtion_site=xtion_site,
        magnitude_func=logic,
    )

compression_spring classmethod

Python
compression_spring(
    name: str,
    action_site: AnySite,
    xtion_site: AnySite,
    stiffness: float | NamedValue[float] = 0.0,
    damping: float | NamedValue[float] = 0.0,
    rest_length: float = 0.0,
) -> Self

Creates a spring-damper that only acts when compressed (dist < rest_length). Useful for bumpers, feet, push-off springs, or end-stops.

Source code in src/mujoco_mojo/runtime/load.py
Python
@classmethod
def compression_spring(
    cls,
    name: str,
    action_site: AnySite,
    xtion_site: AnySite,
    stiffness: float | NamedValue[float] = 0.0,
    damping: float | NamedValue[float] = 0.0,
    rest_length: float = 0.0,
) -> Self:
    """
    Creates a spring-damper that only acts when compressed (dist < rest_length). Useful for bumpers, feet, push-off springs, or end-stops.
    """

    def logic(
        d: float,
        v: float,
        r0: float,
        mj_model: mujoco.MjModel,
        mj_data: mujoco.MjData,
    ) -> float:
        if d < rest_length:
            return _ideal_force_logic(d, v, stiffness, damping, rest_length)
        return 0.0

    return cls(
        name=name,
        action_site=action_site,
        xtion_site=xtion_site,
        magnitude_func=logic,
    )

tension_spring classmethod

Python
tension_spring(
    name: str,
    action_site: AnySite,
    xtion_site: AnySite,
    stiffness: float | NamedValue[float] = 0.0,
    damping: float | NamedValue[float] = 0.0,
    rest_length: float = 0.0,
) -> Self

Creates a spring-damper that only acts when extended (dist > rest_length). Useful for cables, bungees, or tendons.

Source code in src/mujoco_mojo/runtime/load.py
Python
@classmethod
def tension_spring(
    cls,
    name: str,
    action_site: AnySite,
    xtion_site: AnySite,
    stiffness: float | NamedValue[float] = 0.0,
    damping: float | NamedValue[float] = 0.0,
    rest_length: float = 0.0,
) -> Self:
    """
    Creates a spring-damper that only acts when extended (dist > rest_length). Useful for cables, bungees, or tendons.
    """

    def logic(
        d: float,
        v: float,
        r0: float,
        mj_model: mujoco.MjModel,
        mj_data: mujoco.MjData,
    ) -> float:
        if d > rest_length:
            return _ideal_force_logic(d, v, stiffness, damping, rest_length)
        return 0.0

    return cls(
        name=name,
        action_site=action_site,
        xtion_site=xtion_site,
        magnitude_func=logic,
    )

ScalarForce

Bases: BodyReactionForce

Applies a scalar force along the local X-axis of the action_site.

name instance-attribute

Python
name: str

Name of the forcing function. Used in data output column naming.

active class-attribute instance-attribute

Python
active: bool = True

Whether or not this force should be active.

action_site instance-attribute

Python
action_site: AnySite

Site on which the forcing function acts.

rel_to_site class-attribute instance-attribute

Python
rel_to_site: AnySite | None = None

Frame of reference for the calculated force. If None, uses worldbody.

xtion_body class-attribute instance-attribute

Python
xtion_body: Body | None = None

Body on which the load should be acted on. If None the world will be used.

scalar_func class-attribute instance-attribute

Python
scalar_func: Callable[
    [float, ndarray, MjModel, MjData], float
] = lambda t, unit_vec, m, d: 0.0

Func(time, action_site x axis unit vector, MjModel, MjData) -> scalar force value.

get_visuals

Python
get_visuals(
    mj_model: MjModel, mj_data: MjData
) -> list[ArrowConfig]

Returns a list of arrow configurations for the renderer.

Source code in src/mujoco_mojo/runtime/load.py
Python
def get_visuals(
    self, mj_model: mujoco.MjModel, mj_data: mujoco.MjData
) -> list[ArrowConfig]:
    """Returns a list of arrow configurations for the renderer."""
    if not self.active:
        return []

    visuals: list[ArrowConfig] = []
    action_pos = self.action_site.rt_pos(mj_model, mj_data)

    # force arrow
    f_vec = self._last_f[:3]
    if np.linalg.norm(f_vec) > 1e-4:
        visuals.append(
            ArrowConfig(
                pos=action_pos,
                vec=f_vec,
                color=Color.EMERALD_500.rgba,
                is_torque=False,
            )
        )

    t_vec = self._last_t[:3]
    if np.linalg.norm(t_vec) > 1e-4:
        visuals.append(
            ArrowConfig(
                pos=action_pos,
                vec=t_vec,
                color=Color.AMBER_500.rgba,
                is_torque=True,
            )
        )

    return visuals

ScalarTorque

Bases: BodyReactionForce

Applies a scalar torque along the local X-axis of the action_site.

name instance-attribute

Python
name: str

Name of the forcing function. Used in data output column naming.

active class-attribute instance-attribute

Python
active: bool = True

Whether or not this force should be active.

action_site instance-attribute

Python
action_site: AnySite

Site on which the forcing function acts.

rel_to_site class-attribute instance-attribute

Python
rel_to_site: AnySite | None = None

Frame of reference for the calculated force. If None, uses worldbody.

xtion_body class-attribute instance-attribute

Python
xtion_body: Body | None = None

Body on which the load should be acted on. If None the world will be used.

scalar_func class-attribute instance-attribute

Python
scalar_func: Callable[
    [float, ndarray, MjModel, MjData], float
] = lambda t, unit_vec, m, d: 0.0

Func(time, action_site x-axis unit vector, MjModel, MjData) -> scalar torque value.

get_visuals

Python
get_visuals(
    mj_model: MjModel, mj_data: MjData
) -> list[ArrowConfig]

Returns a list of arrow configurations for the renderer.

Source code in src/mujoco_mojo/runtime/load.py
Python
def get_visuals(
    self, mj_model: mujoco.MjModel, mj_data: mujoco.MjData
) -> list[ArrowConfig]:
    """Returns a list of arrow configurations for the renderer."""
    if not self.active:
        return []

    visuals: list[ArrowConfig] = []
    action_pos = self.action_site.rt_pos(mj_model, mj_data)

    # force arrow
    f_vec = self._last_f[:3]
    if np.linalg.norm(f_vec) > 1e-4:
        visuals.append(
            ArrowConfig(
                pos=action_pos,
                vec=f_vec,
                color=Color.EMERALD_500.rgba,
                is_torque=False,
            )
        )

    t_vec = self._last_t[:3]
    if np.linalg.norm(t_vec) > 1e-4:
        visuals.append(
            ArrowConfig(
                pos=action_pos,
                vec=t_vec,
                color=Color.AMBER_500.rgba,
                is_torque=True,
            )
        )

    return visuals

RuntimeManager dataclass

Python
RuntimeManager(
    signal_manager: SignalManager | None = None,
    loads: list[Load] = list(),
    proximities: list[Proximity] = list(),
    video_recorders: list[VideoRecorder] = list(),
    playback_speed: float = 1.0,
    _sync_hook: SyncHook | None = None,
    _skip_recording: bool = False,
    _resolved: bool = False,
)

__enter__

Python
__enter__() -> Self

Prime the model and prepare results.

Source code in src/mujoco_mojo/runtime/runtime_manager.py
Python
def __enter__(self) -> Self:
    """Prime the model and prepare results."""
    return self

__exit__

Python
__exit__(exc_type, exc, tb)

Ensure all telemetry is flushed even if the simulation crashed. Also saves recordings

Source code in src/mujoco_mojo/runtime/runtime_manager.py
Python
def __exit__(self, exc_type, exc, tb):
    """Ensure all telemetry is flushed even if the simulation crashed. Also saves recordings"""
    if self.signal_manager:
        self.signal_manager.close()

    if self.video_recorders:
        self.save_recordings()

resolve

Python
resolve(mj_model: MjModel, mj_data: MjData)

Call this once after mj_loadXML to prime the caches.

Source code in src/mujoco_mojo/runtime/runtime_manager.py
Python
def resolve(self, mj_model: mujoco.MjModel, mj_data: mujoco.MjData):
    """Call this once after mj_loadXML to prime the caches."""
    for load in self.loads:
        load.resolve_ids(mj_model, mj_data)
    self._resolved = True

step

Python
step(mj_model: MjModel, mj_data: MjData)

Calculates forces, integratess physics, and handles telemetry.

Source code in src/mujoco_mojo/runtime/runtime_manager.py
Python
def step(self, mj_model: mujoco.MjModel, mj_data: mujoco.MjData):
    """
    Calculates forces, integratess physics, and handles telemetry.
    """
    # clear buffers for next timestep
    mj_data.xfrc_applied.fill(0)  # external forces
    mj_data.qfrc_applied.fill(0)  # user-defined forces
    mj_data.ctrl.fill(0)  # actuator forces

    # sync state variables and clear render buffer
    mujoco.mj_forward(mj_model, mj_data)

    if mj_data.time == 0.0 or self._start_sim_time == 0.0:
        self._start_sim_time = mj_data.time
        self._start_wall_time = time.time()

    # resolve IDs and initial distances
    # it is critical this is done after mj_forward to update site positions
    if not self._resolved:
        self.resolve(mj_model, mj_data)

    # apply user forcing functions
    for load in self.loads:
        load.apply_load(mj_model, mj_data)

    # record data
    if self.signal_manager and not self._skip_recording:
        mujoco.mj_forward(mj_model, mj_data)
        self.signal_manager.record(mj_model, mj_data)

    # record any frames which are due
    all_arrows = None
    all_lines = None
    if self.video_recorders or self._sync_hook:
        # gather arrows for forcing functions
        all_arrows: list[ArrowConfig] | None = []
        all_lines: list[LineConfig] | None = []

        for load in self.loads:
            all_arrows.extend(load.get_visuals(mj_model, mj_data))

        for proximity in self.proximities:
            visual = proximity.get_visuals(mj_model, mj_data)
            if visual is not None:
                all_lines.append(visual)

    if self.video_recorders:
        assert all_arrows is not None and all_lines is not None
        for recorder in self.video_recorders:
            recorder.capture_frame(
                mj_model=mj_model,
                mj_data=mj_data,
                custom_arrows=all_arrows,
                custom_lines=all_lines,
            )

    # integrate physics and advance the time
    mujoco.mj_step(mj_model, mj_data)

    if self._sync_hook:
        assert all_arrows is not None and all_lines is not None
        self._sync_hook(mj_model, mj_data, all_arrows, all_lines)

    if self.playback_speed > 0:
        sim_elapsed = mj_data.time - self._start_sim_time

        # how much time we want to have passed
        target_wall_elapsed = sim_elapsed / self.playback_speed

        # how much time has actually passed
        actual_wall_elapsed = time.time() - self._start_wall_time

        sleep_time = target_wall_elapsed - actual_wall_elapsed
        if sleep_time > 0:
            time.sleep(sleep_time)

SignalManager dataclass

Python
SignalManager(
    export_path: Path,
    batch_size: int = 1000,
    record_decimation: int = 1,
    _buffer_row_idx: int = 0,
    _step_count: int = -1,
    _n_cols: int = 0,
)

export_path instance-attribute

Python
export_path: Path

Where the output file should be saved.

batch_size class-attribute instance-attribute

Python
batch_size: int = 1000

Number of steps before flushing to disk.

record_decimation class-attribute instance-attribute

Python
record_decimation: int = 1

How many steps between each recording should be performed.

post

Python
post(
    value: float,
    category: SignalCategory | str,
    subgroups: tuple[str, ...] = (),
    *,
    attr: str | None = None,
)

Injects a value into the telemetry ledger using a hierarchical namespace.

This method constructs a structured key that the dashboard uses to build a navigable tree view. The naming convention follows a folder-like structure to group related signals (e.g., all axes of a body's position).

Format

Category/Subgroup:Attribute (e.g., "Bodies/Link_1:xpos_x")

Parameters:

Name Type Description Default
value float

The numeric data to record.

required
category SignalCategory | str

Top level category (e.g., "Bodies")

required
subgroups tuple[str, ...]

The second-level organizational folders. Defaults to an empty tuple.

()
attr str | None

The specific signal or component name (e.g., "qpos" or "x"). Defaults to None.

None

Examples:

Python Console Session
>>> # Becomes "Bodies/Hand/xpos:x"
>>> manager.post(1.2, SignalCategory.BODIES, ("Hand", "xpos"), "x")
Python Console Session
>>> # Becomes "Sensors/IMU/Accel:z"
>>> manager.post(9.81, "Sensors", ("IMU", "Accel"), attr="z")
Source code in src/mujoco_mojo/runtime/signal_manager.py
Python
def post(
    self,
    value: float,
    category: SignalCategory | str,
    subgroups: tuple[str, ...] = (),
    *,
    attr: str | None = None,
):
    """
    Injects a value into the telemetry ledger using a hierarchical namespace.

    This method constructs a structured key that the dashboard uses to build a navigable tree view. The naming convention follows a folder-like structure to group related signals (e.g., all axes of a body's position).

    Format:
        Category/Subgroup:Attribute
        (e.g., "Bodies/Link_1:xpos_x")

    Args:
        value (float): The numeric data to record.
        category (SignalCategory | str): Top level category (e.g., "Bodies")
        subgroups (tuple[str, ...], optional): The second-level organizational folders. Defaults to an empty tuple.
        attr (str | None, optional): The specific signal or component name (e.g., "qpos" or "x"). Defaults to None.

    Examples:
        >>> # Becomes "Bodies/Hand/xpos:x"
        >>> manager.post(1.2, SignalCategory.BODIES, ("Hand", "xpos"), "x")

        >>> # Becomes "Sensors/IMU/Accel:z"
        >>> manager.post(9.81, "Sensors", ("IMU", "Accel"), attr="z")

    """
    # use tuple as cache key to avoid string construction
    cache_lookup = (str(category), subgroups, attr if attr is not None else "")

    if cache_lookup in self._key_cache:
        # fast path for cached signal
        full_key = self._key_cache[cache_lookup]
    else:
        # slow path for a new signal
        path_parts = [str(category)] + [str(s) for s in subgroups if s]
        full_key = "/".join(path_parts)
        if attr:
            full_key += f":{attr}"

        self._key_cache[cache_lookup] = full_key

    # get column index
    if full_key in self._key_to_idx:
        idx = self._key_to_idx[full_key]
    else:
        # register a new signal column
        idx = self._n_cols
        self._key_to_idx[full_key] = idx
        self._n_cols += 1

        logger.debug(f"New signal registered: {full_key} at index {idx}")

        # grow buffer if exceeding the initial guess
        if self._n_cols > self._data_buffer.shape[1]:
            n_cols_to_add = 50
            new_width = self._data_buffer.shape[1] + n_cols_to_add
            logger.debug(f"Growing telemetry buffer width to {new_width} columns.")

            growth = np.zeros((self.batch_size, n_cols_to_add), dtype=np.float64)
            self._data_buffer = np.hstack([self._data_buffer, growth])

    # write value to buffer for next flush
    self._data_buffer[self._buffer_row_idx, idx] = value

record

Python
record(mj_model: MjModel, mj_data: MjData)

Executes all samplers and advances the buffer index. Flushes if due.

Source code in src/mujoco_mojo/runtime/signal_manager.py
Python
def record(self, mj_model: mujoco.MjModel, mj_data: mujoco.MjData):
    """Executes all samplers and advances the buffer index. Flushes if due."""
    self._step_count += 1
    if self._step_count % self.record_decimation != 0:
        return

    # record simulation time
    self._data_buffer[self._buffer_row_idx, 0] = mj_data.time

    # run samplers
    for task in self._sample_tasks:
        task(mj_model, mj_data)

    self._buffer_row_idx += 1

    if self._buffer_row_idx >= self.batch_size:
        self.flush()

flush

Python
flush()

Commits the memory buffer to the output file.

Source code in src/mujoco_mojo/runtime/signal_manager.py
Python
def flush(self):
    """Commits the memory buffer to the output file."""
    if self._buffer_row_idx == 0:
        return

    # build column names from mapping
    sorted_keys = sorted(self._key_to_idx.keys(), key=lambda x: self._key_to_idx[x])

    # slice only the used portion of the buffer
    new_df = pl.from_numpy(
        data=self._data_buffer[: self._buffer_row_idx, : self._n_cols],
        schema=sorted_keys,
    )

    logger.debug(
        f"Flushing {self._buffer_row_idx} steps to {self.export_path.name}"
    )

    if self.export_path.exists():
        try:
            # Use diagonal concat to safely handle signals added mid-simulation
            existing_df = pl.read_parquet(self.export_path)
            combined_df = pl.concat([existing_df, new_df], how="diagonal")
            combined_df.write_parquet(self.export_path, compression="zstd")
        except Exception as e:
            logger.error(f"Failed to append telemetry: {e}")
    else:
        new_df.write_parquet(self.export_path, compression="zstd")

    # reset buffer for next batch
    self._buffer_row_idx = 0
    self._data_buffer.fill(0.0)

VideoRecorder dataclass

Python
VideoRecorder(
    path: Path,
    camera_name: CameraName,
    show_loads: bool = False,
    show_net_force: bool = False,
    show_contacts: bool = False,
    show_proximities: bool = False,
    fps: int = 30,
    width: int = 640,
    height: int = 480,
    _frames: list = list(),
)

capture_frame

Python
capture_frame(
    mj_model: MjModel,
    mj_data: MjData,
    custom_arrows: list[ArrowConfig],
    custom_lines: list[LineConfig],
)

Captures the current state as a video frame.

Source code in src/mujoco_mojo/runtime/video_recorder.py
Python
def capture_frame(
    self,
    mj_model: mujoco.MjModel,
    mj_data: mujoco.MjData,
    custom_arrows: list[ArrowConfig],
    custom_lines: list[LineConfig],
):
    """Captures the current state as a video frame."""
    if mj_data.time < self._next_record_time:
        return

    # update standard mujoco objects in scene
    self._renderer.update_scene(
        data=mj_data,
        camera=self.camera_name,
        scene_option=self._vopt,
    )

    if custom_arrows and self.show_loads:
        for arrow in custom_arrows:
            arrow.draw_in_scene(mj_model, self._renderer.scene)

    if custom_lines and self.show_proximities:
        for line in custom_lines:
            line.draw_in_scene(self._renderer.scene)

    # capture and increment the clock for the next frame
    self._frames.append(self._renderer.render())
    self._next_record_time += 1 / self.fps

save

Python
save()

Writes the captured frames to a video file.

Source code in src/mujoco_mojo/runtime/video_recorder.py
Python
def save(self):
    """Writes the captured frames to a video file."""
    if not self._frames:
        return
    import mediapy as media

    if self.path.suffix.lower() == ".gif":
        from PIL import Image

        # convert arrays to PIL images
        pil_images = [Image.fromarray(frame) for frame in self._frames]

        # save gif
        pil_images[0].save(
            self.path,
            save_all=True,
            append_images=pil_images[1:],
            duration=int(1000 / self.fps),  # ms per frame
            loop=0,  # loop forever
        )
    else:
        media.write_video(path=self.path, images=self._frames, fps=self.fps)
    logger.info(f"Video saved to {self.path}")