Перейти к основному содержимому

HDF5数据集的模型训练

本文档介绍了如何使用平台导出的HDF5数据集进行各种机器人学习模型的训练,包括模仿学习、强化学习和视觉-语言-动作(VLA)模型等。

数据预处理

在开始训练之前,通常需要对HDF5数据集进行预处理以适应不同的模型架构和训练框架。

基础数据加载器

import h5py
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import io

class RobotDataset(Dataset):
def __init__(self, hdf5_files, transform=None):
self.hdf5_files = hdf5_files
self.transform = transform
self.episodes = []

# 索引所有episode
for file_path in hdf5_files:
with h5py.File(file_path, 'r') as f:
data_group = f['/data']
for episode_name in data_group.keys():
self.episodes.append((file_path, episode_name))

def __len__(self):
return len(self.episodes)

def __getitem__(self, idx):
file_path, episode_name = self.episodes[idx]

with h5py.File(file_path, 'r') as f:
episode = f[f'/data/{episode_name}']

# 读取动作数据
actions = episode['action'][:]

# 读取状态数据
states = episode['observation.state'][:]

# 读取夹爪状态
gripper = episode['observation.gripper'][:]

# 读取图像数据
images = {}
for key in episode.keys():
if key.startswith('observation.images.'):
camera_name = key.split('.')[-1]
# 解压JPEG图像
img_data = episode[key][:]
images[camera_name] = [Image.open(io.BytesIO(frame)) for frame in img_data]

# 读取任务描述
task = episode.attrs.get('task', '')
task_zh = episode.attrs.get('task_zh', '')
score = episode.attrs.get('score', 0.0)

return {
'actions': torch.FloatTensor(actions),
'states': torch.FloatTensor(states),
'gripper': torch.FloatTensor(gripper),
'images': images,
'task': task,
'task_zh': task_zh,
'score': score
}

图像预处理

import torchvision.transforms as transforms

# 定义图像预处理管道
image_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])

def preprocess_images(images_dict, transform):
"""预处理多视角图像"""
processed_images = {}
for camera_name, image_list in images_dict.items():
processed_images[camera_name] = torch.stack([
transform(img) for img in image_list
])
return processed_images

模仿学习(Imitation Learning)

模仿学习通过学习专家演示来训练机器人策略。HDF5数据集中的高质量标注数据非常适合这种训练方式。

行为克隆(Behavior Cloning)

import torch.nn as nn
import torch.optim as optim

class BehaviorCloningModel(nn.Module):
def __init__(self, state_dim, action_dim, image_channels=3):
super().__init__()

# 图像编码器
self.image_encoder = nn.Sequential(
nn.Conv2d(image_channels, 32, 3, stride=2, padding=1),
nn.ReLU(),
nn.Conv2d(32, 64, 3, stride=2, padding=1),
nn.ReLU(),
nn.Conv2d(64, 128, 3, stride=2, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool2d((4, 4)),
nn.Flatten(),
nn.Linear(128 * 4 * 4, 256)
)

# 状态编码器
self.state_encoder = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, 128)
)

# 融合层
self.fusion = nn.Sequential(
nn.Linear(256 + 128, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, action_dim)
)

def forward(self, images, states):
# 处理多视角图像(这里简化为使用第一个相机)
img_features = self.image_encoder(images)
state_features = self.state_encoder(states)

# 特征融合
combined = torch.cat([img_features, state_features], dim=1)
actions = self.fusion(combined)

return actions

# 训练循环
def train_bc_model(model, dataloader, num_epochs=100):
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

for epoch in range(num_epochs):
total_loss = 0
for batch in dataloader:
# 获取第一个相机的图像(简化处理)
images = list(batch['images'].values())[0][:, 0] # [batch, H, W, C]
images = images.permute(0, 3, 1, 2) # [batch, C, H, W]

states = batch['states'][:, 0] # 第一个时间步的状态
actions = batch['actions'][:, 0] # 第一个时间步的动作

optimizer.zero_grad()
predicted_actions = model(images, states)
loss = criterion(predicted_actions, actions)
loss.backward()
optimizer.step()

total_loss += loss.item()

print(f'Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(dataloader):.4f}')

序列建模与Transformer

对于需要考虑时序信息的任务,可以使用Transformer架构来建模动作序列。

Transformer策略网络

class TransformerPolicy(nn.Module):
def __init__(self, state_dim, action_dim, seq_len=50, d_model=256):
super().__init__()
self.seq_len = seq_len
self.d_model = d_model

# 输入投影
self.state_proj = nn.Linear(state_dim, d_model)
self.action_proj = nn.Linear(action_dim, d_model)

# 位置编码
self.pos_encoding = nn.Parameter(torch.randn(seq_len, d_model))

# Transformer编码器
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=8, batch_first=True
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=6)

# 输出层
self.output_proj = nn.Linear(d_model, action_dim)

def forward(self, states, actions=None):
batch_size, seq_len = states.shape[:2]

# 状态编码
state_emb = self.state_proj(states)

if actions is not None: # 训练模式
action_emb = self.action_proj(actions)
# 将动作向右移位作为输入
action_input = torch.cat([
torch.zeros(batch_size, 1, self.d_model, device=actions.device),
action_emb[:, :-1]
], dim=1)
inputs = state_emb + action_input
else: # 推理模式
inputs = state_emb

# 添加位置编码
inputs += self.pos_encoding[:seq_len]

# Transformer处理
outputs = self.transformer(inputs)

# 预测动作
predicted_actions = self.output_proj(outputs)

return predicted_actions

视觉-语言-动作(VLA)模型

VLA模型结合视觉、语言和动作信息,能够根据自然语言指令执行复杂的机器人任务。

多模态VLA架构

from transformers import AutoTokenizer, AutoModel

class VLAModel(nn.Module):
def __init__(self, action_dim, language_model='bert-base-uncased'):
super().__init__()

# 语言编码器
self.tokenizer = AutoTokenizer.from_pretrained(language_model)
self.language_encoder = AutoModel.from_pretrained(language_model)

# 视觉编码器
self.vision_encoder = nn.Sequential(
nn.Conv2d(3, 64, 7, stride=2, padding=3),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(64, 128, 3, padding=1),
nn.ReLU(),
nn.Conv2d(128, 256, 3, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool2d((8, 8)),
nn.Flatten(),
nn.Linear(256 * 8 * 8, 512)
)

# 跨模态注意力
self.cross_attention = nn.MultiheadAttention(
embed_dim=512, num_heads=8, batch_first=True
)

# 动作解码器
self.action_decoder = nn.Sequential(
nn.Linear(512, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, action_dim)
)

def forward(self, images, task_descriptions, states):
batch_size = images.shape[0]

# 编码语言指令
tokens = self.tokenizer(
task_descriptions,
return_tensors='pt',
padding=True,
truncation=True,
max_length=128
)
language_features = self.language_encoder(**tokens).last_hidden_state

# 编码视觉信息
vision_features = self.vision_encoder(images)
vision_features = vision_features.unsqueeze(1) # [batch, 1, 512]

# 跨模态注意力
attended_features, _ = self.cross_attention(
vision_features, language_features, language_features
)

# 预测动作
actions = self.action_decoder(attended_features.squeeze(1))

return actions

# VLA训练函数
def train_vla_model(model, dataloader, num_epochs=50):
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

for epoch in range(num_epochs):
total_loss = 0
for batch in dataloader:
# 获取数据
images = list(batch['images'].values())[0][:, 0].permute(0, 3, 1, 2)
task_descriptions = batch['task']
actions = batch['actions'][:, 0]
states = batch['states'][:, 0]

optimizer.zero_grad()
predicted_actions = model(images, task_descriptions, states)
loss = criterion(predicted_actions, actions)
loss.backward()
optimizer.step()

total_loss += loss.item()

print(f'Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(dataloader):.4f}')

强化学习集成

可以将HDF5数据作为强化学习的初始化数据或经验回放缓冲区的种子数据。

离线强化学习

import torch.nn.functional as F

class OfflineRLAgent:
def __init__(self, state_dim, action_dim, lr=3e-4):
self.actor = BehaviorCloningModel(state_dim, action_dim)
self.critic = nn.Sequential(
nn.Linear(state_dim + action_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, 1)
)

self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr)

def train_step(self, states, actions, rewards, next_states, dones):
# 训练Critic
with torch.no_grad():
next_actions = self.actor(next_states)
target_q = rewards + 0.99 * (1 - dones) * self.critic(
torch.cat([next_states, next_actions], dim=1)
)

current_q = self.critic(torch.cat([states, actions], dim=1))
critic_loss = F.mse_loss(current_q, target_q)

self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()

# 训练Actor
predicted_actions = self.actor(states)
actor_loss = -self.critic(
torch.cat([states, predicted_actions], dim=1)
).mean()

self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()

return critic_loss.item(), actor_loss.item()

数据增强技术

为了提高模型的泛化能力,可以对HDF5数据进行各种增强。

图像增强

import torchvision.transforms as T

class RobotDataAugmentation:
def __init__(self):
self.image_aug = T.Compose([
T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
T.RandomRotation(degrees=5),
T.RandomResizedCrop(224, scale=(0.9, 1.0)),
T.RandomHorizontalFlip(p=0.1), # 小概率水平翻转
])

def augment_episode(self, episode_data):
"""对单个episode进行数据增强"""
augmented_data = episode_data.copy()

# 图像增强
for camera_name, images in episode_data['images'].items():
augmented_images = []
for img in images:
if torch.rand(1) < 0.5: # 50%概率进行增强
img = self.image_aug(img)
augmented_images.append(img)
augmented_data['images'][camera_name] = augmented_images

# 动作噪声
if torch.rand(1) < 0.3: # 30%概率添加动作噪声
noise = torch.randn_like(episode_data['actions']) * 0.01
augmented_data['actions'] = episode_data['actions'] + noise

return augmented_data

模型评估与部署

评估指标

def evaluate_model(model, test_dataloader, device):
model.eval()
total_mse = 0
total_samples = 0

with torch.no_grad():
for batch in test_dataloader:
images = list(batch['images'].values())[0][:, 0].permute(0, 3, 1, 2).to(device)
states = batch['states'][:, 0].to(device)
true_actions = batch['actions'][:, 0].to(device)

predicted_actions = model(images, states)
mse = F.mse_loss(predicted_actions, true_actions)

total_mse += mse.item() * len(true_actions)
total_samples += len(true_actions)

avg_mse = total_mse / total_samples
print(f'Test MSE: {avg_mse:.6f}')
return avg_mse

模型保存与加载

def save_model(model, optimizer, epoch, loss, filepath):
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': loss,
}, filepath)

def load_model(model, optimizer, filepath):
checkpoint = torch.load(filepath)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epoch = checkpoint['epoch']
loss = checkpoint['loss']
return epoch, loss

总结

通过合理利用平台导出的HDF5数据,可以有效训练各种机器人学习模型:

  • 模仿学习:直接从专家演示学习策略
  • 序列建模:使用Transformer处理时序依赖
  • 多模态学习:结合视觉、语言和动作信息
  • 强化学习:作为离线数据或初始化数据

关键是根据具体任务需求选择合适的模型架构,并充分利用HDF5数据的丰富标注信息来提高模型性能。