Перейти к основному содержимому

Обучение моделей на наборах данных HDF5

В данном документе описывается, как использовать экспортированные платформой наборы данных HDF5 для обучения различных моделей управления роботами, включая имитационное обучение (Imitation Learning), обучение с подкреплением (RL) и модели Видение-Язык-Действие (VLA).

Предобработка данных

Перед началом обучения обычно требуется предварительная обработка данных HDF5 для их адаптации к конкретной архитектуре модели и фреймворку обучения.

Базовый загрузчик данных

import h5py
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import io

class RobotDataset(Dataset):
def __init__(self, hdf5_files, transform=None):
self.hdf5_files = hdf5_files
self.transform = transform
self.episodes = []

# Индексация всех эпизодов
for file_path in hdf5_files:
with h5py.File(file_path, 'r') as f:
data_group = f['/data']
for episode_name in data_group.keys():
self.episodes.append((file_path, episode_name))

def __len__(self):
return len(self.episodes)

def __getitem__(self, idx):
file_path, episode_name = self.episodes[idx]

with h5py.File(file_path, 'r') as f:
episode = f[f'/data/{episode_name}']

# Чтение данных действий (actions)
actions = episode['action'][:]

# Чтение данных состояния (states)
states = episode['observation.state'][:]

# Чтение состояния захвата (gripper)
gripper = episode['observation.gripper'][:]

# Чтение данных изображений
images = {}
for key in episode.keys():
if key.startswith('observation.images.'):
camera_name = key.split('.')[-1]
# Распаковка изображений JPEG
img_data = episode[key][:]
images[camera_name] = [Image.open(io.BytesIO(frame)) for frame in img_data]

# Чтение описания задачи
task = episode.attrs.get('task', '')
task_zh = episode.attrs.get('task_zh', '')
score = episode.attrs.get('score', 0.0)

return {
'actions': torch.FloatTensor(actions),
'states': torch.FloatTensor(states),
'gripper': torch.FloatTensor(gripper),
'images': images,
'task': task,
'task_zh': task_zh,
'score': score
}

Предобработка изображений

import torchvision.transforms as transforms

# Определение конвейера предобработки изображений
image_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])

def preprocess_images(images_dict, transform):
"""Предобработка изображений с нескольких камер"""
processed_images = {}
for camera_name, image_list in images_dict.items():
processed_images[camera_name] = torch.stack([
transform(img) for img in image_list
])
return processed_images

Имитационное обучение (Imitation Learning)

Имитационное обучение тренирует стратегии робота на основе демонстраций эксперта. Высококачественные размеченные данные в формате HDF5 идеально подходят для этого метода.

Клонирование поведения (Behavior Cloning)

import torch.nn as nn
import torch.optim as optim

class BehaviorCloningModel(nn.Module):
def __init__(self, state_dim, action_dim, image_channels=3):
super().__init__()

# Кодировщик изображений
self.image_encoder = nn.Sequential(
nn.Conv2d(image_channels, 32, 3, stride=2, padding=1),
nn.ReLU(),
nn.Conv2d(32, 64, 3, stride=2, padding=1),
nn.ReLU(),
nn.Conv2d(64, 128, 3, stride=2, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool2d((4, 4)),
nn.Flatten(),
nn.Linear(128 * 4 * 4, 256)
)

# Кодировщик состояния
self.state_encoder = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, 128)
)

# Слой слияния (fusion)
self.fusion = nn.Sequential(
nn.Linear(256 + 128, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, action_dim)
)

def forward(self, images, states):
# Обработка изображений (упрощенно используем первую камеру)
img_features = self.image_encoder(images)
state_features = self.state_encoder(states)

# Слияние признаков
combined = torch.cat([img_features, state_features], dim=1)
actions = self.fusion(combined)

return actions

# Цикл обучения
def train_bc_model(model, dataloader, num_epochs=100):
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

for epoch in range(num_epochs):
total_loss = 0
for batch in dataloader:
# Получение изображений первой камеры (упрощенно)
images = list(batch['images'].values())[0][:, 0] # [batch, H, W, C]
images = images.permute(0, 3, 1, 2) # [batch, C, H, W]

states = batch['states'][:, 0] # Состояние на первом временном шаге
actions = batch['actions'][:, 0] # Действие на первом временном шаге

optimizer.zero_grad()
predicted_actions = model(images, states)
loss = criterion(predicted_actions, actions)
loss.backward()
optimizer.step()

total_loss += loss.item()

print(f'Эпоха {epoch+1}/{num_epochs}, Loss: {total_loss/len(dataloader):.4f}')

Секвентальное моделирование и Transformer

Для задач, требующих учета временной последовательности, можно использовать архитектуру Transformer.

Сеть стратегии на базе Transformer

class TransformerPolicy(nn.Module):
def __init__(self, state_dim, action_dim, seq_len=50, d_model=256):
super().__init__()
self.seq_len = seq_len
self.d_model = d_model

# Проекция входов
self.state_proj = nn.Linear(state_dim, d_model)
self.action_proj = nn.Linear(action_dim, d_model)

# Позиционное кодирование
self.pos_encoding = nn.Parameter(torch.randn(seq_len, d_model))

# Энкодер Transformer
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=8, batch_first=True
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=6)

# Выходной слой
self.output_proj = nn.Linear(d_model, action_dim)

def forward(self, states, actions=None):
batch_size, seq_len = states.shape[:2]

# Кодирование состояния
state_emb = self.state_proj(states)

if actions is not None: # Режим обучения
action_emb = self.action_proj(actions)
# Смещение действий вправо для использования в качестве входа
action_input = torch.cat([
torch.zeros(batch_size, 1, self.d_model, device=actions.device),
action_emb[:, :-1]
], dim=1)
inputs = state_emb + action_input
else: # Режим инференса
inputs = state_emb

# Добавление позиционного кодирования
inputs += self.pos_encoding[:seq_len]

# Обработка в Transformer
outputs = self.transformer(inputs)

# Предсказание действий
predicted_actions = self.output_proj(outputs)

return predicted_actions

Модели Видение-Язык-Действие (VLA)

Модели VLA объединяют визуальную, текстовую и моторную информацию, позволяя выполнять сложные задачи на основе инструкций на естественном языке.

Мультимодальная архитектура VLA

from transformers import AutoTokenizer, AutoModel

class VLAModel(nn.Module):
def __init__(self, action_dim, language_model='bert-base-uncased'):
super().__init__()

# Языковой энкодер
self.tokenizer = AutoTokenizer.from_pretrained(language_model)
self.language_encoder = AutoModel.from_pretrained(language_model)

# Визуальный энкодер
self.vision_encoder = nn.Sequential(
nn.Conv2d(3, 64, 7, stride=2, padding=3),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(64, 128, 3, padding=1),
nn.ReLU(),
nn.Conv2d(128, 256, 3, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool2d((8, 8)),
nn.Flatten(),
nn.Linear(256 * 8 * 8, 512)
)

# Кросс-модальное внимание (cross-attention)
self.cross_attention = nn.MultiheadAttention(
embed_dim=512, num_heads=8, batch_first=True
)

# Декодер действий
self.action_decoder = nn.Sequential(
nn.Linear(512, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, action_dim)
)

def forward(self, images, task_descriptions, states):
batch_size = images.shape[0]

# Кодирование языковых инструкций
tokens = self.tokenizer(
task_descriptions,
return_tensors='pt',
padding=True,
truncation=True,
max_length=128
)
language_features = self.language_encoder(**tokens).last_hidden_state

# Кодирование визуальной информации
vision_features = self.vision_encoder(images)
vision_features = vision_features.unsqueeze(1) # [batch, 1, 512]

# Кросс-модальное внимание
attended_features, _ = self.cross_attention(
vision_features, language_features, language_features
)

# Предсказание действий
actions = self.action_decoder(attended_features.squeeze(1))

return actions

# Функция обучения VLA
def train_vla_model(model, dataloader, num_epochs=50):
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

for epoch in range(num_epochs):
total_loss = 0
for batch in dataloader:
# Получение данных
images = list(batch['images'].values())[0][:, 0].permute(0, 3, 1, 2)
task_descriptions = batch['task']
actions = batch['actions'][:, 0]
states = batch['states'][:, 0]

optimizer.zero_grad()
predicted_actions = model(images, task_descriptions, states)
loss = criterion(predicted_actions, actions)
loss.backward()
optimizer.step()

total_loss += loss.item()

print(f'Эпоха {epoch+1}/{num_epochs}, Loss: {total_loss/len(dataloader):.4f}')

Интеграция с обучением с подкреплением (RL)

Данные HDF5 могут использоваться для инициализации агентов RL или в качестве «семян» для буфера воспроизведения опыта (experience replay).

Офлайн-обучение с подкреплением

import torch.nn.functional as F

class OfflineRLAgent:
def __init__(self, state_dim, action_dim, lr=3e-4):
self.actor = BehaviorCloningModel(state_dim, action_dim)
self.critic = nn.Sequential(
nn.Linear(state_dim + action_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, 1)
)

self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr)

def train_step(self, states, actions, rewards, next_states, dones):
# Обучение Критика (Critic)
with torch.no_grad():
next_actions = self.actor(next_states)
target_q = rewards + 0.99 * (1 - dones) * self.critic(
torch.cat([next_states, next_actions], dim=1)
)

current_q = self.critic(torch.cat([states, actions], dim=1))
critic_loss = F.mse_loss(current_q, target_q)

self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()

# Обучение Актора (Actor)
predicted_actions = self.actor(states)
actor_loss = -self.critic(
torch.cat([states, predicted_actions], dim=1)
).mean()

self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()

return critic_loss.item(), actor_loss.item()

Технологии аугментации данных

Для повышения обобщающей способности моделей к данным HDF5 можно применять различные методы аугментации.

Аугментация изображений

import torchvision.transforms as T

class RobotDataAugmentation:
def __init__(self):
self.image_aug = T.Compose([
T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
T.RandomRotation(degrees=5),
T.RandomResizedCrop(224, scale=(0.9, 1.0)),
T.RandomHorizontalFlip(p=0.1), # Горизонтальное отражение с малой вероятностью
])

def augment_episode(self, episode_data):
"""Аугментация данных одного эпизода"""
augmented_data = episode_data.copy()

# Аугментация изображений
for camera_name, images in episode_data['images'].items():
augmented_images = []
for img in images:
if torch.rand(1) < 0.5: # 50% вероятность аугментации
img = self.image_aug(img)
augmented_images.append(img)
augmented_data['images'][camera_name] = augmented_images

# Шум в действиях
if torch.rand(1) < 0.3: # 30% вероятность добавления шума
noise = torch.randn_like(episode_data['actions']) * 0.01
augmented_data['actions'] = episode_data['actions'] + noise

return augmented_data

Оценка и развертывание моделей

Метрики оценки

def evaluate_model(model, test_dataloader, device):
model.eval()
total_mse = 0
total_samples = 0

with torch.no_grad():
for batch in test_dataloader:
images = list(batch['images'].values())[0][:, 0].permute(0, 3, 1, 2).to(device)
states = batch['states'][:, 0].to(device)
true_actions = batch['actions'][:, 0].to(device)

predicted_actions = model(images, states)
mse = F.mse_loss(predicted_actions, true_actions)

total_mse += mse.item() * len(true_actions)
total_samples += len(true_actions)

avg_mse = total_mse / total_samples
print(f'Test MSE: {avg_mse:.6f}')
return avg_mse

Сохранение и загрузка моделей

def save_model(model, optimizer, epoch, loss, filepath):
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': loss,
}, filepath)

def load_model(model, optimizer, filepath):
checkpoint = torch.load(filepath)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epoch = checkpoint['epoch']
loss = checkpoint['loss']
return epoch, loss

Заключение

Грамотное использование экспортированных платформой данных HDF5 позволяет эффективно обучать широкий спектр моделей управления роботами:

  • Имитационное обучение: Прямое обучение стратегий на демонстрациях экспертов.
  • Секвентальное моделирование: Использование Transformer для работы с временными зависимостями.
  • Мультимодальное обучение: Сочетание визуальной, текстовой и моторной информации.
  • Обучение с подкреплением: Использование в качестве офлайн-данных или данных для инициализации.

Ключевым моментом является выбор подходящей архитектуры модели в соответствии с требованиями конкретной задачи и максимальное использование богатой информации аннотаций в данных HDF5 для повышения производительности.