Skip to content

Runtime manager

runtime_manager

RuntimeManager dataclass

Python
RuntimeManager(
    signal_manager: SignalManager | None = None,
    loads: list[Load] = list(),
    proximities: list[Proximity] = list(),
    video_recorders: list[VideoRecorder] = list(),
    playback_speed: float = 1.0,
    _sync_hook: SyncHook | None = None,
    _skip_recording: bool = False,
    _resolved: bool = False,
)

__enter__

Python
__enter__() -> Self

Prime the model and prepare results.

Source code in src/mujoco_mojo/runtime/runtime_manager.py
Python
def __enter__(self) -> Self:
    """Prime the model and prepare results."""
    return self

__exit__

Python
__exit__(exc_type, exc, tb)

Ensure all telemetry is flushed even if the simulation crashed. Also saves recordings

Source code in src/mujoco_mojo/runtime/runtime_manager.py
Python
def __exit__(self, exc_type, exc, tb):
    """Ensure all telemetry is flushed even if the simulation crashed. Also saves recordings"""
    if self.signal_manager:
        self.signal_manager.close()

    if self.video_recorders:
        self.save_recordings()

resolve

Python
resolve(mj_model: MjModel, mj_data: MjData)

Call this once after mj_loadXML to prime the caches.

Source code in src/mujoco_mojo/runtime/runtime_manager.py
Python
def resolve(self, mj_model: mujoco.MjModel, mj_data: mujoco.MjData):
    """Call this once after mj_loadXML to prime the caches."""
    for load in self.loads:
        load.resolve_ids(mj_model, mj_data)
    self._resolved = True

step

Python
step(mj_model: MjModel, mj_data: MjData)

Calculates forces, integratess physics, and handles telemetry.

Source code in src/mujoco_mojo/runtime/runtime_manager.py
Python
def step(self, mj_model: mujoco.MjModel, mj_data: mujoco.MjData):
    """
    Calculates forces, integratess physics, and handles telemetry.
    """
    # clear buffers for next timestep
    mj_data.xfrc_applied.fill(0)  # external forces
    mj_data.qfrc_applied.fill(0)  # user-defined forces
    mj_data.ctrl.fill(0)  # actuator forces

    # sync state variables and clear render buffer
    mujoco.mj_forward(mj_model, mj_data)

    if mj_data.time == 0.0 or self._start_sim_time == 0.0:
        self._start_sim_time = mj_data.time
        self._start_wall_time = time.time()

    # resolve IDs and initial distances
    # it is critical this is done after mj_forward to update site positions
    if not self._resolved:
        self.resolve(mj_model, mj_data)

    # apply user forcing functions
    for load in self.loads:
        load.apply_load(mj_model, mj_data)

    # record data
    if self.signal_manager and not self._skip_recording:
        mujoco.mj_forward(mj_model, mj_data)
        self.signal_manager.record(mj_model, mj_data)

    # record any frames which are due
    all_arrows = None
    all_lines = None
    if self.video_recorders or self._sync_hook:
        # gather arrows for forcing functions
        all_arrows: list[ArrowConfig] | None = []
        all_lines: list[LineConfig] | None = []

        for load in self.loads:
            all_arrows.extend(load.get_visuals(mj_model, mj_data))

        for proximity in self.proximities:
            visual = proximity.get_visuals(mj_model, mj_data)
            if visual is not None:
                all_lines.append(visual)

    if self.video_recorders:
        assert all_arrows is not None and all_lines is not None
        for recorder in self.video_recorders:
            recorder.capture_frame(
                mj_model=mj_model,
                mj_data=mj_data,
                custom_arrows=all_arrows,
                custom_lines=all_lines,
            )

    # integrate physics and advance the time
    mujoco.mj_step(mj_model, mj_data)

    if self._sync_hook:
        assert all_arrows is not None and all_lines is not None
        self._sync_hook(mj_model, mj_data, all_arrows, all_lines)

    if self.playback_speed > 0:
        sim_elapsed = mj_data.time - self._start_sim_time

        # how much time we want to have passed
        target_wall_elapsed = sim_elapsed / self.playback_speed

        # how much time has actually passed
        actual_wall_elapsed = time.time() - self._start_wall_time

        sleep_time = target_wall_elapsed - actual_wall_elapsed
        if sleep_time > 0:
            time.sleep(sleep_time)