HDF5数据集的模型训练
本文档介绍了如何使用平台导出的HDF5数据集进行各种机器人学习模型的训练,包括模仿学习、强化学习和视觉-语言-动作(VLA)模型等。
数据预处理
在开始训练之前,通常需要对HDF5数据集进行预处理以适应不同的模型架构和训练框架。
基础数据加载器
import h5py
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import io
class RobotDataset(Dataset):
def __init__(self, hdf5_files, transform=None):
self.hdf5_files = hdf5_files
self.transform = transform
self.episodes = []
# 索引所有episode
for file_path in hdf5_files:
with h5py.File(file_path, 'r') as f:
data_group = f['/data']
for episode_name in data_group.keys():
self.episodes.append((file_path, episode_name))
def __len__(self):
return len(self.episodes)
def __getitem__(self, idx):
file_path, episode_name = self.episodes[idx]
with h5py.File(file_path, 'r') as f:
episode = f[f'/data/{episode_name}']
# 读取动作数据
actions = episode['action'][:]
# 读取状态数据
states = episode['observation.state'][:]
# 读取夹爪状态
gripper = episode['observation.gripper'][:]
# 读取图像数据
images = {}
for key in episode.keys():
if key.startswith('observation.images.'):
camera_name = key.split('.')[-1]
# 解压JPEG图像
img_data = episode[key][:]
images[camera_name] = [Image.open(io.BytesIO(frame)) for frame in img_data]
# 读取任务描述
task = episode.attrs.get('task', '')
task_zh = episode.attrs.get('task_zh', '')
score = episode.attrs.get('score', 0.0)
return {
'actions': torch.FloatTensor(actions),
'states': torch.FloatTensor(states),
'gripper': torch.FloatTensor(gripper),
'images': images,
'task': task,
'task_zh': task_zh,
'score': score
}
图像预处理
import torchvision.transforms as transforms
# 定义图像预处理管道
image_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])
def preprocess_images(images_dict, transform):
"""预处理多视角图像"""
processed_images = {}
for camera_name, image_list in images_dict.items():
processed_images[camera_name] = torch.stack([
transform(img) for img in image_list
])
return processed_images
模仿学习(Imitation Learning)
模仿学习通过学习专家演示来训练机器人策略。HDF5数据集中的高质量标注数据非常适合这种训练方式。
行为克隆(Behavior Cloning)
import torch.nn as nn
import torch.optim as optim
class BehaviorCloningModel(nn.Module):
def __init__(self, state_dim, action_dim, image_channels=3):
super().__init__()
# 图像编码器
self.image_encoder = nn.Sequential(
nn.Conv2d(image_channels, 32, 3, stride=2, padding=1),
nn.ReLU(),
nn.Conv2d(32, 64, 3, stride=2, padding=1),
nn.ReLU(),
nn.Conv2d(64, 128, 3, stride=2, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool2d((4, 4)),
nn.Flatten(),
nn.Linear(128 * 4 * 4, 256)
)
# 状态编码器
self.state_encoder = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, 128)
)
# 融合层
self.fusion = nn.Sequential(
nn.Linear(256 + 128, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, action_dim)
)
def forward(self, images, states):
# 处理多视角图像(这里简化为使用第一个相机)
img_features = self.image_encoder(images)
state_features = self.state_encoder(states)
# 特征融合
combined = torch.cat([img_features, state_features], dim=1)
actions = self.fusion(combined)
return actions
# 训练循环
def train_bc_model(model, dataloader, num_epochs=100):
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
for epoch in range(num_epochs):
total_loss = 0
for batch in dataloader:
# 获取第一个相机的图像(简化处理)
images = list(batch['images'].values())[0][:, 0] # [batch, H, W, C]
images = images.permute(0, 3, 1, 2) # [batch, C, H, W]
states = batch['states'][:, 0] # 第一个时间步的状态
actions = batch['actions'][:, 0] # 第一个时间步的动作
optimizer.zero_grad()
predicted_actions = model(images, states)
loss = criterion(predicted_actions, actions)
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f'Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(dataloader):.4f}')