LeIsaac × Cosmos: Video-to-Action Data Generation Pipeline
This tutorial extends LeIsaac by integrating Cosmos-Predict2.5 and GR00T-Dreams IDM into a LeIsaac-native data generation loop. LeIsaac is used to collect teleoperated demonstrations (HDF5) and convert them into LeRobot datasets. Cosmos-Predict 2.5 is post-trained on these videos to synthesize additional rollout videos at scale, and IDM is fine-tuned on the same dataset to infer robot actions from the generated videos. Together, this produces a scalable pipeline for constructing synthetic, complete LeRobot datasets , which can be replayed and evaluated directly in LeIsaac.
🎥 From a Single Demonstration to Large-Scale Synthetic Rollouts via Cosmos
Overview
- Use LeIsaac to collect an HDF5 dataset and convert it into a LeRobot dataset
- Post-train Cosmos-Predict2.5, then run inference to generate synthetic videos
- Fine-tune IDM, then run inference to generate synthetic LeRobot trajectories(parquet)
- Convert the original HDF5 dataset and the IDM-generated LeRobot trajectories (parquet) to a replayable LeIsaac HDF5 dataset and use LeIsaac to replay and evaluate
Step 1: Data Collection with LeIsaac
- A real demonstration dataset in HDF5 format (recorded in Isaac Sim through LeIsaac teleoperation)
- A converted dataset in LeRobot format (videos + metadata + parquet)
1.1 Collect the HDF5 Dataset via LeIsaac Teleoperation
First, collect demonstrations using LeIsaac teleoperation.
- Reference: Teleoperation | LeIsaac Document
- Output: an HDF5 file (for example,
dataset.hdf5)
The HDF5 dataset serves as the primary source of truth for replay and evaluation.
It is also the source used to construct the initial LeRobot dataset for both Cosmos and IDM.
1.2 Convert HDF5 to a LeRobot Dataset
Convert the recorded HDF5 dataset into LeRobot format.
- Reference: Data Convention | LeIsaac Document
- Output: a LeRobot dataset used for post-training Cosmos-Predict2.5 and fine-tuning IDM
💡Important (Video Encoding)
Ensure that all output videos are encoded using H.264 (h264).
Avoid AV1 encoding, as Cosmos and IDM may fail during decoding or processing.
You can directly modify the leisaac/scripts/convert/isaaclab2lerobot.py to ensure correct video encoding:
- "video.codec": "av1"
+ "video.codec": "h264"
Step 2: Video Generation with Cosmos-predict2.5
- A Cosmos-Predict2.5 checkpoint post-trained on your task-specific LeRobot videos
- A set of synthetic rollout videos generated by Cosmos
In this pipeline, Cosmos-Predict 2.5 is used purely as a video generator.
It learns the visual rollout distribution from LeRobot videos collected via LeIsaac, and generates new rollout videos conditioned on:
- a text prompt (task description),
- and the first few frames of an example videos.
These generated videos will later be converted into executable robot actions using IDM.
2.1 Install Cosmos-Predict2.5
Set up the Cosmos-Predict2.5 environment by following the official installation guide:Set up the Cosmos-Predict2.5
2.2 Prepare the Cosmos-Predict2.5 Post-Training Dataset (Video + Prompt)
Dataset folder format should be:
cosmos-predict2.5/datasets/benchmark_train/<task_name>/
├── metas/
│ ├── *.txt
├── videos/
│ ├── *.mp4
Construct it from your LeRobot dataset:
-
Copy MP4 videos from your LeRobot dataset from
<path_to_lerobot_dataset>/videos/tocosmos-predict2.5/datasets/benchmark_train/<task_name>/videos/ -
Rename the copied videos as a clean numeric sequence:
1.mp4,2.mp4,3.mp4, ... -
Create the same number of prompt files under
cosmos-predict2.5/datasets/benchmark_train/<task_name>/metas/ -
Fill each prompt file using the task text in
<path_to_lerobot_dataset>/meta/tasks.jsonl.For example, if one line in tasks.jsonl is{"task_index": 0, "task": "Lift the red cube up."},then 1.txt should containLift_the_red_cube_up.In many single-task cases, you will write the same prompt into all *.txt files, but the format supports per-video prompts if needed.
2.3 Post-train Cosmos-Predict2.5
Post-train Cosmos-Predict2.5 on the prepared dataset by following the official post-training instructions.
This step produces a Cosmos-Predict2.5 checkpoint specialized for your robot embodiment and task distribution.
2.4 Run Inference to Generate Videos
After post-training, the Cosmos-Predict2.5 model checkpoints are typically saved in Distributed Checkpoint (DCP) format.Before running inference, these checkpoints need to be converted into a consolidated PyTorch format that can be loaded by the inference scripts.
Follow the converting-dcp-checkpoint-to-consolidated-pytorch-format to convert DCP checkpoints.
Once the checkpoint has been converted, run video generation inference following the official running-inference instructions .
🔬 Batch Inference (Used for fine-tuning IDM)
For large-scale video generation, batch inference is supported.
Create the batch inference helper script generate_batch_config.py under cosmos-predict2.5/scripts/.
cosmos-predict2.5/scripts/generate_batch_config.py
#!/usr/bin/env python3
"""
Generate batch inference configuration files (JSONL).
This script scans a directory of input videos and generates a JSONL file
for batch video2world inference. Each line corresponds to one video.
Optionally, task prompts can be loaded from episodes.jsonl.
"""
import json
import os
from pathlib import Path
# ---------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------
# Input video directory used to generate inference configs
VIDEO_DIR = "<path_to_lerobot_dataset>/videos/chunk-000/observation.images.front"
# Metadata directory (used when loading text prompts)
META_DIR = "<path_to_lerobot_dataset>/meta"
# Output JSONL file
OUTPUT_JSONL = "batch_inference_config.jsonl"
# Task name prefix
TASK_NAME = "liftcube"
# ---------------------------------------------------------------------
# Base configuration template shared by all videos
# ---------------------------------------------------------------------
BASE_CONFIG = {
"inference_type": "video2world",
"seed": 21,
"guidance": 7,
"resolution": "480,640",
"enable_autoregressive": True,
# Number of output frames (e.g. 110 ≈ 6s, 140 ≈ 8s, 210 ≈ 16s)
"num_output_frames": 210,
"chunk_size": 77,
"chunk_overlap": 1,
# Default prompt (will be overridden if episode-specific prompt is found)
"prompt": "The robot arm is performing a task",
"negative_prompt": (
"The video captures a series of frames showing ugly scenes, static with no motion, "
"motion blur, over-saturation, shaky footage, low resolution, grainy texture, "
"pixelated images, poorly lit areas, underexposed and overexposed scenes, "
"poor color balance, washed out colors, choppy sequences, jerky movements, "
"low frame rate, artifacting, color banding, unnatural transitions, "
"outdated special effects, fake elements, unconvincing visuals, "
"poorly edited content, jump cuts, visual noise, and flickering. "
"Overall, the video is of poor quality."
),
}
# ---------------------------------------------------------------------
# Main logic
# ---------------------------------------------------------------------
def main(use_prompt=True):
video_dir = Path(VIDEO_DIR)
meta_dir = Path(META_DIR)
# Collect all mp4 video files
video_files = sorted(video_dir.glob("*.mp4"))
print(f"Found {len(video_files)} video files")
# Load prompts from episodes.jsonl if enabled
episode_prompts = {}
if use_prompt:
episodes_file = meta_dir / "episodes.jsonl"
if episodes_file.exists():
with open(episodes_file, 'r') as f:
for line in f:
episode_data = json.loads(line)
episode_index = episode_data.get("episode_index")
tasks = episode_data.get("tasks", [])
if episode_index is not None and tasks:
# Use the first task as the episode prompt
prompt = f"The robot arm is performing a task. {tasks[0]}"
episode_prompts[episode_index] = prompt
print(f"✓ Loaded {len(episode_prompts)} prompts from episodes.jsonl")
# Generate JSONL configuration file
with open(OUTPUT_JSONL, 'w') as f:
for video_file in video_files:
# Video filename without extension (e.g. episode_000001)
video_name = video_file.stem
# Create per-video config based on the base template
config = BASE_CONFIG.copy()
config["name"] = f"{TASK_NAME}_{video_name}"
config["input_path"] = str(video_file)
# Assign episode-specific prompt if available
if use_prompt:
# Extract episode index from video filename
episode_index = int(video_name.split('_')[-1])
if episode_index in episode_prompts:
config["prompt"] = episode_prompts[episode_index]
# Write one JSON object per line
f.write(json.dumps(config) + '\n')
# Summary information
print(f"✓ Config file generated: {OUTPUT_JSONL}")
print(f" - Total videos: {len(video_files)}")
print(f" - Use prompts: {'Yes' if use_prompt else 'No'}")
if use_prompt:
print(
f" - Videos with matched prompts: "
f"{sum(1 for vf in video_files if int(vf.stem.split('_')[-1]) in episode_prompts)}"
)
# ---------------------------------------------------------------------
# Script entry point
# ---------------------------------------------------------------------
if __name__ == "__main__":
import sys
# Text prompts are enabled only when '--use-prompt' is provided
use_prompt = "--use-prompt" in sys.argv
main(use_prompt=use_prompt)
You can generate a batch inference configuration jsonl file using:
# Run this command from the cosmos-predict2.5 project root
# cd <path_to_cosmos-predict2.5>
python scripts/generate_batch_config.py --use-prompt
Step 3: Action Inference with IDM
- An IDM checkpoint fine-tuned on your LeRobot dataset and robot embodiment
- A set of LeRobot-format trajectories (e.g. parquet files) inferred from Cosmos-generated videos
In this pipeline, IDM (Inverse Dynamics Model) from GR00T-Dreams is used to convert Cosmos-generated rollout videos into executable robot actions.
IDM is first fine-tuned on the original LeRobot dataset collected via LeIsaac, and then applied to infer actions from synthetic videos, producing a new, fully compatible LeRobot dataset.
3.1 Install IDM Environment
IDM requires the Cosmos-Predict2 environment (not 2.5).
Follow the official prerequisites guide:Cosmos-Predict2 – Prerequisites
💡 Notes on Dependency Installation
- When installing dependencies such as
openaiandtyro, use:uv pip install openai tyro numpydantic albumentations tianshou - For
pytorch3d, install withno build isolation:uv pip install --no-build-isolation git+https://github.com/facebookresearch/pytorch3d.git - If you encounter issues related to
APEX, it can be safely removed from the uv environment.APEX is not required for IDM training or inference.:
3.2 Fine-tune IDM
For IDM fine-tuning, please refer to training-custom-idm-model
3.2.1 Preparation: Modality Metadata and DataConfig
1.Add modality.json
Create modality.json under GR00T-Dreams/IDM_dump/global_metadata/{embodiment_name}/ and copy the same file to <path_to_lerobot_dataset>/meta/
Example: SO101 modality.json
{
"state": {
"shoulder_pan": { "start": 0, "end": 1 },
"shoulder_lift": { "start": 1, "end": 2 },
"elbow_flex": { "start": 2, "end": 3 },
"wrist_flex": { "start": 3, "end": 4 },
"wrist_roll": { "start": 4, "end": 5 },
"gripper": { "start": 5, "end": 6 }
},
"action": {
"shoulder_pan": { "start": 0, "end": 1, "absolute": false },
"shoulder_lift": { "start": 1, "end": 2, "absolute": false },
"elbow_flex": { "start": 2, "end": 3, "absolute": false },
"wrist_flex": { "start": 3, "end": 4, "absolute": false },
"wrist_roll": { "start": 4, "end": 5, "absolute": false },
"gripper": { "start": 5, "end": 6, "absolute": false }
},
"video": {
"front": { "original_key": "observation.images.front" }
},
"annotation": {
"human.task_description": { "original_key": "task_index" }
}
}
2.Add a New DataConfig (So101DataConfig)
Add a new So101DataConfig class defining in GR00T-Dreams/gr00t/experiment/data_config_idm.py:
GR00T-Dreams/gr00t/experiment/data_config_idm.py
class So101DataConfig(BaseDataConfig):
video_keys = ["video.front"]
state_keys = ["state.shoulder_pan", "state.shoulder_lift", "state.elbow_flex", "state.wrist_flex", "state.wrist_roll", "state.gripper"]
action_keys = ["action.shoulder_pan", "action.shoulder_lift", "action.elbow_flex", "action.wrist_flex", "action.wrist_roll", "action.gripper"]
language_keys = ["annotation.human.task_description"]
observation_indices = [0, 16]
action_indices = list(range(16))
def modality_config(self) -> dict[str, ModalityConfig]:
video_modality = ModalityConfig(
delta_indices=self.observation_indices,
modality_keys=self.video_keys,
)
state_modality = ModalityConfig(
delta_indices=self.observation_indices,
modality_keys=self.state_keys,
)
action_modality = ModalityConfig(
delta_indices=self.action_indices,
modality_keys=self.action_keys,
)
language_modality = ModalityConfig(
delta_indices=self.observation_indices,
modality_keys=self.language_keys,
)
modality_configs = {
"video": video_modality,
"state": state_modality,
"action": action_modality,
"language": language_modality,
}
return modality_configs
def transform(self) -> ModalityTransform:
transforms = [
# video transforms
VideoToTensor(apply_to=self.video_keys),
VideoCrop(apply_to=self.video_keys, scale=0.95),
VideoResize(apply_to=self.video_keys, height=224, width=224, interpolation="linear"),
VideoColorJitter(
apply_to=self.video_keys,
brightness=0.3,
contrast=0.4,
saturation=0.5,
hue=0.08,
),
VideoToNumpy(apply_to=self.video_keys),
# state transforms
StateActionToTensor(apply_to=self.state_keys),
StateActionTransform(
apply_to=self.state_keys,
normalization_modes={key: "min_max" for key in self.state_keys},
),
# action transforms
StateActionToTensor(apply_to=self.action_keys),
StateActionTransform(
apply_to=self.action_keys,
normalization_modes={key: "min_max" for key in self.action_keys},
),
# concat transforms
ConcatTransform(
video_concat_order=self.video_keys,
state_concat_order=self.state_keys,
action_concat_order=self.action_keys,
),
# model-specific transform
GR00TIDMTransform(
state_horizon=len(self.observation_indices),
action_horizon=len(self.action_indices),
max_state_dim=64,
max_action_dim=32,
),
]
return ComposedModalityTransform(transforms=transforms)
Register the new config in DATA_CONFIG_MAP:
DATA_CONFIG_MAP = {
"gr1_arms_waist": Gr1ArmsWaistDataConfig(),
"gr1_arms_only": Gr1ArmsOnlyDataConfig(),
"gr1_full_upper_body": Gr1FullUpperBodyDataConfig(),
"bimanual_panda_gripper": BimanualPandaGripperDataConfig(),
"bimanual_panda_hand": BimanualPandaHandDataConfig(),
"single_panda_gripper": SinglePandaGripperDataConfig(),
"so100": So100DataConfig(),
"franka": FrankaDataConfig(),
"so101": So101DataConfig(),#add
}
3.2.2 Run IDM Post-training
Post-train IDM (GR00T-Dreams) using the LeRobot dataset and the newly registered data configuration.
PYTHONPATH=. torchrun scripts/idm_training.py \
--dataset-path <path_to_lerobot_dataset> \
--data-config <key_from_DATA_CONFIG_MAP> \
--output_dir <path_to_output_dir>
After loading the dataset, stats.json will be automatically generated under <path_to_lerobot_dataset>/meta/.
Copy this file to GR00T-Dreams/IDM_dump/global_metadata/{embodiment_name}/.
3.3 Extract Robot Actions to LeRobot Format
After IDM post-training, use the trained model to infer actions from Cosmos-generated videos.
3.3.1 Prepare Inference Configuration
Select a checkpoint directory (e.g. checkpoint-10000/) and create checkpoint-10000/experiment_cfg/conf.yaml:
checkpoint-10000/experiment_cfg/conf.yaml
# Configuration for so101 IDM (LiftCube dataset)
# SO101 robot with 6 DOF: shoulder_pan, shoulder_lift, elbow_flex, wrist_flex, wrist_roll, gripper
modality_configs:
so101:
video:
_target_: gr00t.data.dataset.ModalityConfig
delta_indices:
- 0
- 16
modality_keys:
- video.front
state:
_target_: gr00t.data.dataset.ModalityConfig
delta_indices:
- 0
- 16
modality_keys:
- state.shoulder_pan
- state.shoulder_lift
- state.elbow_flex
- state.wrist_flex
- state.wrist_roll
- state.gripper
action:
_target_: gr00t.data.dataset.ModalityConfig
delta_indices:
- 0
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
modality_keys:
- action.shoulder_pan
- action.shoulder_lift
- action.elbow_flex
- action.wrist_flex
- action.wrist_roll
- action.gripper
language:
_target_: gr00t.data.dataset.ModalityConfig
delta_indices:
- 0
modality_keys:
- annotation.human.task_description
all_transforms:
so101:
_target_: gr00t.data.transform.base.ComposedModalityTransform
transforms:
- _target_: gr00t.data.transform.video.VideoToTensor
apply_to:
- video.front
- _target_: gr00t.data.transform.video.VideoCrop
apply_to:
- video.front
scale: 0.95
- _target_: gr00t.data.transform.video.VideoResize
apply_to:
- video.front
height: 224
width: 224
interpolation: linear
- _target_: gr00t.data.transform.video.VideoColorJitter
apply_to:
- video.front
brightness: 0.3
contrast: 0.4
saturation: 0.5
hue: 0.08
- _target_: gr00t.data.transform.video.VideoToNumpy
apply_to:
- video.front
- _target_: gr00t.data.transform.state_action.StateActionToTensor
apply_to:
- state.shoulder_pan
- state.shoulder_lift
- state.elbow_flex
- state.wrist_flex
- state.wrist_roll
- state.gripper
- _target_: gr00t.data.transform.state_action.StateActionTransform
apply_to:
- state.shoulder_pan
- state.shoulder_lift
- state.elbow_flex
- state.wrist_flex
- state.wrist_roll
- state.gripper
normalization_modes:
state.shoulder_pan: min_max
state.shoulder_lift: min_max
state.elbow_flex: min_max
state.wrist_flex: min_max
state.wrist_roll: min_max
state.gripper: min_max
- _target_: gr00t.data.transform.state_action.StateActionToTensor
apply_to:
- action.shoulder_pan
- action.shoulder_lift
- action.elbow_flex
- action.wrist_flex
- action.wrist_roll
- action.gripper
- _target_: gr00t.data.transform.state_action.StateActionTransform
apply_to:
- action.shoulder_pan
- action.shoulder_lift
- action.elbow_flex
- action.wrist_flex
- action.wrist_roll
- action.gripper
normalization_modes:
action.shoulder_pan: min_max
action.shoulder_lift: min_max
action.elbow_flex: min_max
action.wrist_flex: min_max
action.wrist_roll: min_max
action.gripper: min_max
- _target_: gr00t.data.transform.concat.ConcatTransform
video_concat_order:
- video.front
state_concat_order:
- state.shoulder_pan
- state.shoulder_lift
- state.elbow_flex
- state.wrist_flex
- state.wrist_roll
- state.gripper
action_concat_order:
- action.shoulder_pan
- action.shoulder_lift
- action.elbow_flex
- action.wrist_flex
- action.wrist_roll
- action.gripper
- _target_: gr00t.model.transforms_idm.GR00TIDMTransform
state_horizon: 2
action_horizon: 16
max_state_dim: 64
max_action_dim: 32
metadata_versions:
so101: v2.1
Use a configuration matching the SO101 modality definition and transforms (as shown above).
3.3.2 Run IDM Inference
1.Modify conversion scripts to add a specific embodiment (SO101)
To support the SO101 embodiment, we extend the conversion utilities under
GR00T-Dreams/IDM_dump/ to handle SO101-specific video streams and metadata.
GR00T-DreamsIDM_dump/preprocess_video.py
+ # === [1/3] Video frame writing: add SO101 output key (front view) ===
elif dataset == 'so100':
image = resize_with_padding(frame, ratio)
output_videos['observation.images.webcam'].append_data(image)
+ elif dataset == 'so101':
+ image = resize_with_padding(frame, ratio)
+ output_videos['observation.images.front'].append_data(image)
else:
raise ValueError(f"Unknown task: {src_path}")
----------------------------------------------------------------------
+ # === [2/3] Output directory mapping: add videos/observation.images.front for SO101 ===
elif dataset == 'so100':
output_dirs = {
'observation.images.webcam': os.path.join(dst_dir, 'videos', 'observation.images.webcam'),
}
+ elif dataset == 'so101':
+ output_dirs = {
+ 'observation.images.front': os.path.join(dst_dir, 'videos', 'observation.images.front'),
+ }
for dir_path in output_dirs.values():
os.makedirs(dir_path, exist_ok=True)
----------------------------------------------------------------------
+ # === [3/3] CLI argument: add so101 to dataset choices ===
parser.add_argument('--max_videos', type=int, default=None,
help='Maximum number of videos to process per subdirectory (for debugging)')
- parser.add_argument('--dataset', type=str, default='robocasa',
- help='Dataset name', choices=['robocasa', 'gr1', 'franka', 'so100'])
+ parser.add_argument('--dataset', type=str, default='robocasa',
+ help='Dataset name', choices=['robocasa', 'gr1', 'franka', 'so100', 'so101'])
parser.add_argument("--recursive", action="store_true", help="Process subdirectories recursively, maintaining directory structure")
GR00T-DreamsIDM_dump/raw_to_lerobot.py
+ # === [1/2] Embodiment inference & annotation source: add SO101 ===
if args.embodiment is None:
if 'robocasa' in args.output_dir:
args.embodiment = "robocasa_panda_omron"
elif 'gr1' in args.output_dir:
args.embodiment = "gr1_unified"
elif 'franka' in args.output_dir:
args.embodiment = "franka"
elif 'so100' in args.output_dir:
args.embodiment = "so100"
+ elif 'so101' in args.output_dir:
+ args.embodiment = "so101"
else:
raise ValueError(f"Unknown embodiment for {args.output_dir}")
if args.embodiment == "robocasa_panda_omron":
args.annotation_source = "human.action.task_description"
elif args.embodiment == "gr1_unified":
args.annotation_source = "human.coarse_action"
elif args.embodiment == "franka":
args.annotation_source = "language.language_instruction"
elif args.embodiment == "so100":
args.annotation_source = "human.task_description"
+ elif args.embodiment == "so101":
+ args.annotation_source = "human.task_description"
----------------------------------------------------------------------
+ # === [2/2] Global metadata source: add SO101 modality definition ===
elif args.embodiment == "so100":
source_dir = "IDM_dump/global_metadata/so100"
+ elif args.embodiment == "so101":
+ source_dir = "IDM_dump/global_metadata/so101"
# copy modality.json
shutil.copy(
source_dir + "/modality.json",
args.output_dir + "/meta/modality.json"
)
GR00T-DreamsIDM_dump/dump_idm_actions.py
+ # === [1/1] Embodiment tag mapping: add SO101 ===
if "gr1" in embodiment:
embodiment_tag = EmbodimentTag.GR1_unified
elif "franka" in embodiment:
embodiment_tag = EmbodimentTag.FRANKA
elif "so100" in embodiment:
embodiment_tag = EmbodimentTag.SO100
+ elif "so101" in embodiment:
+ embodiment_tag = EmbodimentTag.NEW_EMBODIMENT
elif "robocasa" in embodiment:
embodiment_tag = EmbodimentTag.ROBOCASA
else:
raise ValueError(f"Unknown embodiment: {embodiment}")
2.Create a format conversion interface from Cosmos to IDM
Create the convert helper script cosmos2.5_to_step2_format.py under GR00T-Dreams/IDM_dump/.
GR00T-Dreams/IDM_dump/cosmos2.5_to_step2_format.py
#!/usr/bin/env python3
"""
Convert cosmos2.5 outputs (+ optional LeRobot meta) to convert_directory output format.
Input:
cosmos_dir/
├── *.mp4
├── *.json(same stem as mp4)
└── ...
Optional:
lerobot_dir/
└── meta/
├── tasks.jsonl
└── episodes.jsonl
Output:
output_dir/
├── <TaskName>/
│ ├── 0.mp4
│ ├── 1.mp4
│ └── ...
└── ...
Rule:
- If json has "prompt": use it as task name (sanitize to dir)
- Else: fallback to LeRobot meta (requires --lerobot_dir)
- If lerobot_dir not provided: print warning and skip that sample
"""
import argparse
import json
import shutil
from pathlib import Path
from collections import defaultdict
from typing import Dict, List, Optional, Tuple
def sanitize_task_name(task: str) -> str:
"""
Convert task description to valid directory name.
Example: "Lift the red cube up." -> "Lift_the_red_cube_up"
"""
task = task.strip()
for ch in [".", ",", "!", "?", ":", ";", "\"", "'"]:
task = task.replace(ch, "")
task = task.replace(" ", "_")
task = "_".join(filter(None, task.split("_")))
return task
def load_tasks(tasks_file: Path) -> Dict[int, str]:
"""Load task definitions. Returns {task_index: task_string}"""
tasks: Dict[int, str] = {}
with open(tasks_file, "r") as f:
for line in f:
if not line.strip():
continue
data = json.loads(line)
tasks[int(data["task_index"])] = data["task"]
return tasks
def load_episodes(episodes_file: Path) -> List[dict]:
"""Load episode information. Returns list of dict (episode_index, tasks, length, ...)"""
episodes: List[dict] = []
with open(episodes_file, "r") as f:
for line in f:
if not line.strip():
continue
episodes.append(json.loads(line))
return episodes
def build_episode_to_task_map(lerobot_dir: Path) -> Dict[int, str]:
"""
Build mapping: episode_index -> task_string (first task in episode['tasks'])
"""
meta_dir = lerobot_dir / "meta"
tasks_path = meta_dir / "tasks.jsonl"
episodes_path = meta_dir / "episodes.jsonl"
if not tasks_path.exists() or not episodes_path.exists():
raise FileNotFoundError(f"LeRobot meta files not found under: {meta_dir}")
_tasks = load_tasks(tasks_path) # not strictly needed, but useful if episodes store indices in some setups
episodes = load_episodes(episodes_path)
ep2task: Dict[int, str] = {}
for ep in episodes:
ep_idx = int(ep["episode_index"])
ep_tasks = ep.get("tasks", [])
if not ep_tasks:
continue
# In many LeRobot datasets, ep["tasks"] stores task strings already.
# If it stores indices, you can extend here. We'll support both:
t0 = ep_tasks[0]
if isinstance(t0, int):
task_str = _tasks.get(int(t0), str(t0))
else:
task_str = str(t0)
ep2task[ep_idx] = task_str
return ep2task
def read_json(path: Path) -> Optional[dict]:
try:
with open(path, "r") as f:
return json.load(f)
except Exception as e:
print(f"[WARN] Failed to read json: {path} ({e})")
return None
def parse_episode_index_from_stem(stem: str) -> Optional[int]:
"""
Try to extract episode index from filename stem.
Examples it can handle:
- episode_000123
- ..._000123
- 000123
If cannot parse, return None.
"""
# Most common: "episode_000123"
if stem.startswith("episode_"):
tail = stem[len("episode_") :]
if tail.isdigit():
return int(tail)
# Try last underscore chunk
parts = stem.split("_")
for candidate in reversed(parts):
if candidate.isdigit():
return int(candidate)
# Entire stem digits?
if stem.isdigit():
return int(stem)
return None
def convert_cosmos_to_step2(
cosmos_dir: Path,
output_dir: Path,
lerobot_dir: Optional[Path] = None,
chunk_missing_prompt_policy: str = "skip",
):
"""
Convert cosmos2.5 outputs to step2 format.
chunk_missing_prompt_policy:
- "skip": if no prompt and no lerobot_dir mapping, skip the sample
"""
cosmos_dir = cosmos_dir.resolve()
output_dir = output_dir.resolve()
output_dir.mkdir(parents=True, exist_ok=True)
ep2task: Dict[int, str] = {}
if lerobot_dir is not None:
ep2task = build_episode_to_task_map(lerobot_dir.resolve())
# Collect pairs (json, mp4) by stem
json_files = sorted(cosmos_dir.glob("*.json"))
if not json_files:
raise FileNotFoundError(f"No .json files found in cosmos_dir: {cosmos_dir}")
# task_name -> list of source mp4 paths (ordered)
task_to_videos: Dict[str, List[Path]] = defaultdict(list)
skipped: List[Tuple[Path, str]] = []
for jpath in json_files:
stem = jpath.stem
mpath = cosmos_dir / f"{stem}.mp4"
if not mpath.exists():
print(f"[WARN] Missing mp4 for json: {jpath.name} -> expected {mpath.name}, skip.")
skipped.append((jpath, "missing_mp4"))
continue
data = read_json(jpath)
if data is None:
skipped.append((jpath, "bad_json"))
continue
prompt = data.get("prompt", None)
if isinstance(prompt, str) and prompt.strip():
task_str = prompt.strip()
task_dir_name = sanitize_task_name(task_str)
task_to_videos[task_dir_name].append(mpath)
continue
# No prompt -> fallback to lerobot meta mapping
if not ep2task:
print(
f"[WARN] {jpath.name} has no 'prompt'. "
f"You didn't provide --lerobot_dir (or mapping is empty), cannot infer task. Skipping."
)
skipped.append((jpath, "no_prompt_no_lerobot"))
continue
ep_idx = parse_episode_index_from_stem(stem)
if ep_idx is None:
print(
f"[WARN] {jpath.name} has no 'prompt' and episode index cannot be parsed from name '{stem}'. Skipping."
)
skipped.append((jpath, "no_prompt_cannot_parse_episode"))
continue
task_str = ep2task.get(ep_idx)
if not task_str:
print(
f"[WARN] {jpath.name} has no 'prompt'. Parsed episode_index={ep_idx}, "
f"but it's not found in lerobot meta. Skipping."
)
skipped.append((jpath, "episode_not_in_meta"))
continue
task_dir_name = sanitize_task_name(task_str)
task_to_videos[task_dir_name].append(mpath)
# Copy into output folders with sequential numbering per task
total_copied = 0
for task_dir_name, vids in sorted(task_to_videos.items(), key=lambda x: x[0]):
dst_task_dir = output_dir / task_dir_name
dst_task_dir.mkdir(parents=True, exist_ok=True)
# keep deterministic order
vids_sorted = sorted(vids, key=lambda p: p.name)
for i, src in enumerate(vids_sorted):
dst = dst_task_dir / f"{i}.mp4"
shutil.copy2(src, dst)
total_copied += 1
print(f"[OK] Task '{task_dir_name}': {len(vids_sorted)} videos")
print("\nConversion complete!")
print(f" Total videos copied: {total_copied}")
print(f" Output directory: {output_dir}")
if skipped:
print(f"\n[SUMMARY] Skipped {len(skipped)} samples:")
# print a few for readability
for p, reason in skipped[:20]:
print(f" - {p.name}: {reason}")
if len(skipped) > 20:
print(f" ... and {len(skipped) - 20} more")
def main():
parser = argparse.ArgumentParser(
description="Convert cosmos2.5 outputs (+ optional LeRobot meta) to convert_directory output format"
)
parser.add_argument("--cosmos_dir", type=str, required=True, help="Cosmos output directory containing *.mp4 and *.json")
parser.add_argument("--output_dir", type=str, required=True, help="Output directory")
parser.add_argument(
"--lerobot_dir",
type=str,
default=None,
help="Optional LeRobot dataset directory (only needed when json has no 'prompt')",
)
args = parser.parse_args()
cosmos_dir = Path(args.cosmos_dir)
output_dir = Path(args.output_dir)
lerobot_dir = Path(args.lerobot_dir) if args.lerobot_dir else None
convert_cosmos_to_step2(
cosmos_dir=cosmos_dir,
output_dir=output_dir,
lerobot_dir=lerobot_dir,
)
if __name__ == "__main__":
main()
3.Create a preprocessing script
Create the preprocessing helper script so101.sh under GR00T-Dreams/IDM_dump/scripts/preprocess/.
GR00T-Dreams/IDM_dump/scripts/preprocess/so101.sh
#!/bin/bash
set -e
# =============================================================================
# Configuration
# =============================================================================
# Directory containing Cosmos Predict 2.5 inference outputs.
# This directory should include generated videos produced by the Cosmos model.
LEROBOT_INPUT="<path_to_cosmos-predict2.5/outputs/>"
# Working directory used to store all intermediate and final IDM outputs.
# It is recommended to place this on a fast local disk.
WORK_DIR="<path_to_IDM_workdir>"
# Robot embodiment type (used by IDM and LeRobot).
ROBOT_TYPE="so101"
# Key name used to store videos in the LeRobot dataset structure.
# This should match the observation key expected by downstream IDM scripts.
VIDEO_KEY="observation.images.front"
# =============================================================================
# Intermediate directories (auto-generated)
# =============================================================================
# Step 1 output: task-named directories converted from Cosmos outputs
STEP1_DIR="${WORK_DIR}/step1"
# Step 2 output: split videos and text instructions
STEP2_DIR="${WORK_DIR}/step2"
# Step 3 output: preprocessed videos (e.g., resized, normalized)
STEP3_DIR="${WORK_DIR}/step3"
# Step 4 output: final LeRobot-format dataset
STEP4_DIR="${WORK_DIR}/${ROBOT_TYPE}.data"
# =============================================================================
# Step 1: Convert Cosmos outputs to task-based directory structure
# =============================================================================
# - Reads Cosmos Predict outputs
# - Groups videos by task name
# - Prepares data for downstream preprocessing
python3 IDM_dump/scripts/preprocess_leisaac/cosmos2.5_to_step2_format.py \
--cosmos_dir "${LEROBOT_INPUT}" \
--output_dir "${STEP1_DIR}"
# =============================================================================
# Step 2: Split videos and instructions
# =============================================================================
# - Separates raw videos and text instructions into:
# - videos/
# - labels/
# - The --recursive flag allows processing nested task directories
python3 IDM_dump/scripts/preprocess_leisaac/split_video_instruction.py \
--source_dir "${STEP1_DIR}" \
--output_dir "${STEP2_DIR}" \
--recursive
# =============================================================================
# Step 3: Preprocess videos
# =============================================================================
# - Resizes videos to the resolution expected by IDM
# - Converts video format if necessary
# - Preserves directory structure across tasks
python3 IDM_dump/scripts/preprocess_leisaac/preprocess_video.py \
--src_dir "${STEP2_DIR}" \
--dst_dir "${STEP3_DIR}" \
--dataset "${ROBOT_TYPE}" \
--original_width 640 \
--original_height 480 \
--recursive
# =============================================================================
# IMPORTANT USAGE NOTE
# =============================================================================
# It is STRONGLY RECOMMENDED to:
#
# 1. Run Step 1–3 first
# 2. Inspect the contents of ${STEP3_DIR}
# 3. Identify the generated task directory name(s)
# 4. Manually copy the desired task directory name into Step 4
#
# This avoids hard-coding task names before they are known and
# allows flexible reuse of this script across different tasks.
#
# Example:
# ls ${STEP3_DIR}
# → Lift_the_red_cube_up
#
# Then use:
# --input_dir "${STEP3_DIR}/Lift_the_red_cube_up"
#
# =============================================================================
# =============================================================================
# Step 4: Convert preprocessed data to LeRobot dataset reminder
# =============================================================================
# --input_dir:
# Path to a task-specific directory under STEP3_DIR.
# The directory name MUST match the task name generated in Step 3.
#
# --output_dir:
# Target directory for the LeRobot-format dataset.
#
# --fps:
# Target frames per second for the output dataset.
#
# --embodiment:
# Robot embodiment identifier used by LeRobot/IDM.
#
# --video_key:
# Observation key used to store video data.
python3 IDM_dump/scripts/preprocess_leisaac/raw_to_lerobot.py \
--input_dir "${STEP3_DIR}/Lift_the_red_cube_up" \
--output_dir "${STEP4_DIR}" \
--fps 16 \
--embodiment "${ROBOT_TYPE}" \
--video_key "${VIDEO_KEY}"
# =============================================================================
# Step 5: Dump IDM actions from the LeRobot dataset
# =============================================================================
# - Loads a pretrained IDM checkpoint
# - Runs IDM inference on the LeRobot dataset
# - Exports predicted action trajectories
#
# --checkpoint:
# Path to a trained IDM checkpoint.
#
# --dataset:
# Path to the LeRobot dataset generated in Step 4.
#
# --output_dir:
# Output directory where IDM predictions will be stored.
#
# --num_gpus:
# Number of GPUs used for IDM inference.
#
# --video_indices:
# Indices of videos to process (e.g., "0 16" processes videos 0–16).
python3 IDM_dump/scripts/preprocess_leisaac/dump_idm_actions.py \
--checkpoint "<path_to_the_trained_IDM_checkpoint>" \
--dataset "${STEP4_DIR}" \
--output_dir "${STEP4_DIR}_idm_cosmos" \
--num_gpus 1 \
--video_indices "0 16"
4.Run IDM inference Run the inference script:
# Run this command from the GR00T-Dreams project root
# cd <path_to_GR00T-Dreams>
PYTHONPATH=. bash IDM_dump/scripts/preprocess/so101.sh
This step produces the complete LeRobot-format outputs based on the Cosmos-generated videos.
Step 4: Replay and Evaluate in LeIsaac
In this step, the original HDF5 dataset and the IDM-generated LeRobot trajectories (parquet) are first converted and merged into a replayable LeIsaac HDF5 dataset, and then replayed in Isaac Sim using LeIsaac. This replay process is used to validate the quality and physical plausibility of the inferred action trajectories.
4.1 Convert IDM Outputs to LeIsaac HDF5
IDM inference produces action trajectories in LeRobot parquet format. To process these in LeIsaac, they must first be converted into a LeIsaac-compatible HDF5 format.
Switch to the LeIsaac environment and run the conversion script from the LeIsaac project directory:
python scripts/convert/lerobot2isaaclab.py \
--lerobot_dir <path_to_idm_output_lerobot> \
--output_hdf5 <path_to_idm_output_hdf5> \
--column_keys action observation.state
4.2 Merge with Source HDF5 Dataset
The HDF5 file generated in the step4.1 needs to be merged with the source LeIsaac HDF5 file (from chapter 1.1) to restore initial states required for replay.
python scripts/tutorials/cosmos_merge.py \
--lerobot_hdf5 <path_to_idm_output_hdf5> \
--source_hdf5 <path_to_source_leisaac_hdf5> \
--output_hdf5 <path_to_output_hdf5>
4.3 Replay the Generated Dataset in LeIsaac
After conversion and merging, the final HDF5 dataset can be replayed using LeIsaac’s action replay mode. You can refer to dataset_replay for detailed instructions.