Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ def annotate_episode_in_auto_mode(
annotated_episode = env.recorder_manager.get_episode(0)
subtask_term_signal_dict = annotated_episode.data["obs"]["datagen_info"]["subtask_term_signals"]
for signal_name, signal_flags in subtask_term_signal_dict.items():
signal_flags = torch.tensor(signal_flags, device=env.device)
if not torch.any(signal_flags):
is_episode_annotated_successfully = False
print(f'\tDid not detect completion for the subtask "{signal_name}".')
Expand Down
2 changes: 1 addition & 1 deletion source/isaaclab/config/extension.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]

# Note: Semantic Versioning is used: https://semver.org/
version = "0.45.9"
version = "0.45.10"

# Description
title = "Isaac Lab framework for Robot Learning"
Expand Down
21 changes: 21 additions & 0 deletions source/isaaclab/docs/CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
Changelog
---------

0.45.10 (2025-08-28)
~~~~~~~~~~~~~~~~~~~

Fixed
^^^^^

* Fixes a high memory usage and perf slowdown issue in episode data by removing the use of torch.cat when appending to the episode data
at each timestep. The use of torch.cat was causing the episode data to be copied at each timestep, which causes high memory usage and
significant performance slowdown when recording longer episode data.

* Patches the configclass to allow validate dict with key is not a string.

Added
^^^^^

* Added optional episode metadata (ep_meta) to be stored in the HDF5 data attributes.
* Added option to record data pre-physics step.
* Added joint_target data to episode data. Joint target data can be optionally recorded by the user and replayed to improve
determinism of replay.


0.45.9 (2025-08-27)
~~~~~~~~~~~~~~~~~~~

Expand Down
1 change: 1 addition & 0 deletions source/isaaclab/isaaclab/envs/manager_based_rl_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def step(self, action: torch.Tensor) -> VecEnvStepReturn:
self.scene.write_data_to_sim()
# simulate
self.sim.step(render=False)
self.recorder_manager.record_pre_physics_step()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this happening now after the physics step?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The recording right now records the states after a physics step. Here, it's adding functionality to record the states prior to every physics step in the decimation loop which provides more fine grain recording.

Copy link
Contributor Author

@peterd-NV peterd-NV Aug 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The motivation here is that full determinism during replay is not possible when running parallel envs as the physX buffers cannot be flushed on every IsaacLab reset. This can cause significant divergence when replaying data collected with parallel envs by just using env.step as it has to run through the entire action space which may contain controllers that are sensitive to this nondeterminism. Recording at the decimation level every sim dt can help alleviate this by giving users the option of more fine grained control.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that recording pre physics steps is fully optional and will not enabled by our existing default recorder workflows to limit dataset size. Users can take advantage of it via their own recorder terms if needed (like how Lightwheel is currently doing).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but should this be called record_post_physics_step() or record_post_physics_decimated_step because it is after ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I misunderstood Mayank's original comment. Yes I agree the naming here is confusing. I have updated to be called "record_post_physics_decimation_step."

# render between steps only if the GUI or an RTX sensor needs it
# note: we assume the render interval to be the shortest accepted rendering interval.
# If a camera needs rendering at a faster frequency, this will lead to unexpected behavior.
Expand Down
60 changes: 60 additions & 0 deletions source/isaaclab/isaaclab/managers/recorder_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@ def record_post_step(self) -> tuple[str | None, torch.Tensor | dict | None]:
"""
return None, None

def record_pre_physics_step(self) -> tuple[str | None, torch.Tensor | dict | None]:
"""Record data before the physics step is executed.

Returns:
A tuple of key and value to be recorded.
Please refer to the `record_pre_reset` function for more details.
"""
return None, None


class RecorderManager(ManagerBase):
"""Manager for recording data from recorder terms."""
Expand Down Expand Up @@ -362,6 +371,16 @@ def record_post_step(self) -> None:
key, value = term.record_post_step()
self.add_to_episodes(key, value)

def record_pre_physics_step(self) -> None:
"""Trigger recorder terms for pre-physics step functions."""
# Do nothing if no active recorder terms are provided
if len(self.active_terms) == 0:
return

for term in self._terms.values():
key, value = term.record_pre_physics_step()
self.add_to_episodes(key, value)

def record_pre_reset(self, env_ids: Sequence[int] | None, force_export_or_skip=None) -> None:
"""Trigger recorder terms for pre-reset functions.

Expand Down Expand Up @@ -406,6 +425,37 @@ def record_post_reset(self, env_ids: Sequence[int] | None) -> None:
key, value = term.record_post_reset(env_ids)
self.add_to_episodes(key, value, env_ids)

def get_ep_meta(self) -> dict:
"""Get the episode metadata."""
if not hasattr(self._env.cfg, "get_ep_meta"):
# Add basic episode metadata
ep_meta = dict()
ep_meta["sim_args"] = {
"dt": self._env.cfg.sim.dt,
"decimation": self._env.cfg.decimation,
"render_interval": self._env.cfg.sim.render_interval,
"num_envs": self._env.cfg.scene.num_envs,
}
return ep_meta

# Add custom episode metadata if available
ep_meta = self._env.cfg.get_ep_meta()

def convert_fixture_to_name(d) -> dict:
if not isinstance(d, dict):
# Check if it is a fixture type
if hasattr(d, "__class__") and "lwlab.core.models.fixtures" in d.__class__.__module__:
return d.name
return d
result = {}
for k, v in d.items():
result[k] = convert_fixture_to_name(v)
return result

for obj in ep_meta["object_cfgs"]:
obj["placement"] = convert_fixture_to_name(obj["placement"])
return ep_meta

def export_episodes(self, env_ids: Sequence[int] | None = None) -> None:
"""Concludes and exports the episodes for the given environment ids.

Expand All @@ -424,8 +474,18 @@ def export_episodes(self, env_ids: Sequence[int] | None = None) -> None:

# Export episode data through dataset exporter
need_to_flush = False

if any(env_id in self._episodes and not self._episodes[env_id].is_empty() for env_id in env_ids):
ep_meta = self.get_ep_meta()
if self._dataset_file_handler is not None:
self._dataset_file_handler.add_env_args(ep_meta)
if self._failed_episode_dataset_file_handler is not None:
self._failed_episode_dataset_file_handler.add_env_args(ep_meta)

for env_id in env_ids:
if env_id in self._episodes and not self._episodes[env_id].is_empty():
self._episodes[env_id].pre_export()

episode_succeeded = self._episodes[env_id].success
target_dataset_file_handler = None
if (self.cfg.dataset_export_mode == DatasetExportMode.EXPORT_ALL) or (
Expand Down
6 changes: 5 additions & 1 deletion source/isaaclab/isaaclab/utils/configclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,11 @@ def _validate(obj: object, prefix: str = "") -> list[str]:
missing_fields.extend(_validate(item, prefix=current_path))
return missing_fields
elif isinstance(obj, dict):
obj_dict = obj
# Convert any non-string keys to strings to allow validation of dict with non-string keys
if any(not isinstance(key, str) for key in obj.keys()):
obj_dict = {str(key): value for key, value in obj.items()}
else:
obj_dict = obj
elif hasattr(obj, "__dict__"):
obj_dict = obj.__dict__
else:
Expand Down
54 changes: 49 additions & 5 deletions source/isaaclab/isaaclab/utils/datasets/episode_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def __init__(self) -> None:
self._data = dict()
self._next_action_index = 0
self._next_state_index = 0
self._next_joint_target_index = 0
self._seed = None
self._env_id = None
self._success = None
Expand Down Expand Up @@ -110,12 +111,11 @@ def add(self, key: str, value: torch.Tensor | dict):
for sub_key_index in range(len(sub_keys)):
if sub_key_index == len(sub_keys) - 1:
# Add value to the final dict layer
# Use lists to prevent slow tensor copy during concatenation
if sub_keys[sub_key_index] not in current_dataset_pointer:
current_dataset_pointer[sub_keys[sub_key_index]] = value.unsqueeze(0).clone()
current_dataset_pointer[sub_keys[sub_key_index]] = [value.clone()]
else:
current_dataset_pointer[sub_keys[sub_key_index]] = torch.cat(
(current_dataset_pointer[sub_keys[sub_key_index]], value.unsqueeze(0))
)
current_dataset_pointer[sub_keys[sub_key_index]].append(value.clone())
break
# key index
if sub_keys[sub_key_index] not in current_dataset_pointer:
Expand Down Expand Up @@ -160,7 +160,7 @@ def get_state_helper(states, state_index) -> dict | torch.Tensor | None:
elif isinstance(states, torch.Tensor):
if state_index >= len(states):
return None
output_state = states[state_index]
output_state = states[state_index, None]
else:
raise ValueError(f"Invalid state type: {type(states)}")
return output_state
Expand All @@ -174,3 +174,47 @@ def get_next_state(self) -> dict | None:
if state is not None:
self._next_state_index += 1
return state

def get_joint_target(self, joint_target_index) -> dict | torch.Tensor | None:
"""Get the joint target of the specified index from the dataset."""
if "joint_targets" not in self._data:
return None

joint_targets = self._data["joint_targets"]

def get_joint_target_helper(joint_targets, joint_target_index) -> dict | torch.Tensor | None:
if isinstance(joint_targets, dict):
output_joint_targets = dict()
for key, value in joint_targets.items():
output_joint_targets[key] = get_joint_target_helper(value, joint_target_index)
if output_joint_targets[key] is None:
return None
elif isinstance(joint_targets, torch.Tensor):
if joint_target_index >= len(joint_targets):
return None
output_joint_targets = joint_targets[joint_target_index]
else:
raise ValueError(f"Invalid joint target type: {type(joint_targets)}")
return output_joint_targets

output_joint_targets = get_joint_target_helper(joint_targets, joint_target_index)
return output_joint_targets

def get_next_joint_target(self) -> dict | torch.Tensor | None:
"""Get the next joint target from the dataset."""
joint_target = self.get_joint_target(self._next_joint_target_index)
if joint_target is not None:
self._next_joint_target_index += 1
return joint_target

def pre_export(self):
"""Prepare data for export by converting lists to tensors."""

def pre_export_helper(data):
for key, value in data.items():
if isinstance(value, list):
data[key] = torch.stack(value)
elif isinstance(value, dict):
pre_export_helper(value)

pre_export_helper(self._data)
Loading