メインコンテンツまでスキップ

HDF5データセットによるモデルトレーニング

本ドキュメントでは、プラットフォームからエクスポートされたHDF5データセットを使用して、模倣学習、強化学習、視覚-言語-動作(VLA)モデルなどの様々なロボット学習モデルをトレーニングする方法について説明します。

データの前処理

トレーニングを開始する前に、通常、HDF5データセットを異なるモデルアーキテクチャやトレーニングフレームワークに適応させるための前処理が必要になります。

基本的なデータローダー

import h5py
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import io

class RobotDataset(Dataset):
def __init__(self, hdf5_files, transform=None):
self.hdf5_files = hdf5_files
self.transform = transform
self.episodes = []

# すべてのepisodeをインデックス化
for file_path in hdf5_files:
with h5py.File(file_path, 'r') as f:
data_group = f['/data']
for episode_name in data_group.keys():
self.episodes.append((file_path, episode_name))

def __len__(self):
return len(self.episodes)

def __getitem__(self, idx):
file_path, episode_name = self.episodes[idx]

with h5py.File(file_path, 'r') as f:
episode = f[f'/data/{episode_name}']

# 動作データを読み込む
actions = episode['action'][:]

# 状態データを読み込む
states = episode['observation.state'][:]

# グリッパー状態を読み込む
gripper = episode['observation.gripper'][:]

# 画像データを読み込む
images = {}
for key in episode.keys():
if key.startswith('observation.images.'):
camera_name = key.split('.')[-1]
# JPEG画像を解凍
img_data = episode[key][:]
images[camera_name] = [Image.open(io.BytesIO(frame)) for frame in img_data]

# タスク記述を読み込む
task = episode.attrs.get('task', '')
task_zh = episode.attrs.get('task_zh', '')
score = episode.attrs.get('score', 0.0)

return {
'actions': torch.FloatTensor(actions),
'states': torch.FloatTensor(states),
'gripper': torch.FloatTensor(gripper),
'images': images,
'task': task,
'task_zh': task_zh,
'score': score
}

画像の前処理

import torchvision.transforms as transforms

# 画像前処理パイプラインの定義
image_transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
])

def preprocess_images(images_dict, transform):
"""マルチビュー画像を前処理する"""
processed_images = {}
for camera_name, image_list in images_dict.items():
processed_images[camera_name] = torch.stack([
transform(img) for img in image_list
])
return processed_images

模倣学習(Imitation Learning)

模倣学習は、専門家のデモンストレーションを学習することでロボットのポリシーをトレーニングします。HDF5データセット内の高品質なアノテーションデータは、このトレーニング方式に非常に適しています。

行動クローニング(Behavior Cloning)

import torch.nn as nn
import torch.optim as optim

class BehaviorCloningModel(nn.Module):
def __init__(self, state_dim, action_dim, image_channels=3):
super().__init__()

# 画像エンコーダー
self.image_encoder = nn.Sequential(
nn.Conv2d(image_channels, 32, 3, stride=2, padding=1),
nn.ReLU(),
nn.Conv2d(32, 64, 3, stride=2, padding=1),
nn.ReLU(),
nn.Conv2d(64, 128, 3, stride=2, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool2d((4, 4)),
nn.Flatten(),
nn.Linear(128 * 4 * 4, 256)
)

# 状態エンコーダー
self.state_encoder = nn.Sequential(
nn.Linear(state_dim, 128),
nn.ReLU(),
nn.Linear(128, 128)
)

# 融合層
self.fusion = nn.Sequential(
nn.Linear(256 + 128, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, action_dim)
)

def forward(self, images, states):
# マルチビュー画像を処理(ここでは最初のカメラを使用するように簡略化)
img_features = self.image_encoder(images)
state_features = self.state_encoder(states)

# 特徴融合
combined = torch.cat([img_features, state_features], dim=1)
actions = self.fusion(combined)

return actions

# トレーニングループ
def train_bc_model(model, dataloader, num_epochs=100):
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

for epoch in range(num_epochs):
total_loss = 0
for batch in dataloader:
# 最初のカメラの画像を取得(簡略化処理)
images = list(batch['images'].values())[0][:, 0] # [batch, H, W, C]
images = images.permute(0, 3, 1, 2) # [batch, C, H, W]

states = batch['states'][:, 0] # 最初のタイムステップの状態
actions = batch['actions'][:, 0] # 最初のタイムステップの動作

optimizer.zero_grad()
predicted_actions = model(images, states)
loss = criterion(predicted_actions, actions)
loss.backward()
optimizer.step()

total_loss += loss.item()

print(f'Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(dataloader):.4f}')

シーケンスモデリングとTransformer

時系列情報を考慮する必要があるタスクでは、Transformerアーキテクチャを使用して動作シーケンスをモデリングできます。

Transformerポリシーネットワーク

class TransformerPolicy(nn.Module):
def __init__(self, state_dim, action_dim, seq_len=50, d_model=256):
super().__init__()
self.seq_len = seq_len
self.d_model = d_model

# 入力投影
self.state_proj = nn.Linear(state_dim, d_model)
self.action_proj = nn.Linear(action_dim, d_model)

# 位置エンコーディング
self.pos_encoding = nn.Parameter(torch.randn(seq_len, d_model))

# Transformerエンコーダー
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, nhead=8, batch_first=True
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=6)

# 出力層
self.output_proj = nn.Linear(d_model, action_dim)

def forward(self, states, actions=None):
batch_size, seq_len = states.shape[:2]

# 状態エンコーディング
state_emb = self.state_proj(states)

if actions is not None: # トレーニングモード
action_emb = self.action_proj(actions)
# 動作を右にシフトして入力とする
action_input = torch.cat([
torch.zeros(batch_size, 1, self.d_model, device=actions.device),
action_emb[:, :-1]
], dim=1)
inputs = state_emb + action_input
else: # 推論モード
inputs = state_emb

# 位置エンコーディングを追加
inputs += self.pos_encoding[:seq_len]

# Transformer処理
outputs = self.transformer(inputs)

# 動作を予測
predicted_actions = self.output_proj(outputs)

return predicted_actions

視覚-言語-動作(VLA)モデル

VLAモデルは、視覚、言語、および動作情報を組み合わせ、自然言語命令に基づいて複雑なロボットタスクを実行できます。

マルチモーダルVLAアーキテクチャ

from transformers import AutoTokenizer, AutoModel

class VLAModel(nn.Module):
def __init__(self, action_dim, language_model='bert-base-uncased'):
super().__init__()

# 言語エンコーダー
self.tokenizer = AutoTokenizer.from_pretrained(language_model)
self.language_encoder = AutoModel.from_pretrained(language_model)

# 視覚エンコーダー
self.vision_encoder = nn.Sequential(
nn.Conv2d(3, 64, 7, stride=2, padding=3),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(64, 128, 3, padding=1),
nn.ReLU(),
nn.Conv2d(128, 256, 3, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool2d((8, 8)),
nn.Flatten(),
nn.Linear(256 * 8 * 8, 512)
)

# クロスモーダルアテンション
self.cross_attention = nn.MultiheadAttention(
embed_dim=512, num_heads=8, batch_first=True
)

# 動作デコーダー
self.action_decoder = nn.Sequential(
nn.Linear(512, 256),
nn.ReLU(),
nn.Linear(256, 128),
nn.ReLU(),
nn.Linear(128, action_dim)
)

def forward(self, images, task_descriptions, states):
batch_size = images.shape[0]

# 言語命令をエンコード
tokens = self.tokenizer(
task_descriptions,
return_tensors='pt',
padding=True,
truncation=True,
max_length=128
)
language_features = self.language_encoder(**tokens).last_hidden_state

# 視覚情報をエンコード
vision_features = self.vision_encoder(images)
vision_features = vision_features.unsqueeze(1) # [batch, 1, 512]

# クロスモーダルアテンション
attended_features, _ = self.cross_attention(
vision_features, language_features, language_features
)

# 動作を予測
actions = self.action_decoder(attended_features.squeeze(1))

return actions

# VLAトレーニング関数
def train_vla_model(model, dataloader, num_epochs=50):
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

for epoch in range(num_epochs):
total_loss = 0
for batch in dataloader:
# データを取得
images = list(batch['images'].values())[0][:, 0].permute(0, 3, 1, 2)
task_descriptions = batch['task']
actions = batch['actions'][:, 0]
states = batch['states'][:, 0]

optimizer.zero_grad()
predicted_actions = model(images, task_descriptions, states)
loss = criterion(predicted_actions, actions)
loss.backward()
optimizer.step()

total_loss += loss.item()

print(f'Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(dataloader):.4f}')

強化学習の統合

HDF5データを強化学習の初期化データや経験再生バッファのシードデータとして使用できます。

オフライン強化学習

import torch.nn.functional as F

class OfflineRLAgent:
def __init__(self, state_dim, action_dim, lr=3e-4):
self.actor = BehaviorCloningModel(state_dim, action_dim)
self.critic = nn.Sequential(
nn.Linear(state_dim + action_dim, 256),
nn.ReLU(),
nn.Linear(256, 256),
nn.ReLU(),
nn.Linear(256, 1)
)

self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=lr)

def train_step(self, states, actions, rewards, next_states, dones):
# Criticをトレーニング
with torch.no_grad():
next_actions = self.actor(next_states)
target_q = rewards + 0.99 * (1 - dones) * self.critic(
torch.cat([next_states, next_actions], dim=1)
)

current_q = self.critic(torch.cat([states, actions], dim=1))
critic_loss = F.mse_loss(current_q, target_q)

self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()

# Actorをトレーニング
predicted_actions = self.actor(states)
actor_loss = -self.critic(
torch.cat([states, predicted_actions], dim=1)
).mean()

self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()

return critic_loss.item(), actor_loss.item()

データ拡張技術

モデルの汎化能力を向上させるために、HDF5データに対して様々な拡張を適用できます。

画像拡張

import torchvision.transforms as T

class RobotDataAugmentation:
def __init__(self):
self.image_aug = T.Compose([
T.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
T.RandomRotation(degrees=5),
T.RandomResizedCrop(224, scale=(0.9, 1.0)),
T.RandomHorizontalFlip(p=0.1), # 小さな確率で水平反転
])

def augment_episode(self, episode_data):
"""単一のepisodeに対してデータ拡張を行う"""
augmented_data = episode_data.copy()

# 画像拡張
for camera_name, images in episode_data['images'].items():
augmented_images = []
for img in images:
if torch.rand(1) < 0.5: # 50%の確率で拡張を実行
img = self.image_aug(img)
augmented_images.append(img)
augmented_data['images'][camera_name] = augmented_images

# 動作ノイズ
if torch.rand(1) < 0.3: # 30%の確率で動作ノイズを追加
noise = torch.randn_like(episode_data['actions']) * 0.01
augmented_data['actions'] = episode_data['actions'] + noise

return augmented_data

モデルの評価とデプロイ

評価指標

def evaluate_model(model, test_dataloader, device):
model.eval()
total_mse = 0
total_samples = 0

with torch.no_grad():
for batch in test_dataloader:
images = list(batch['images'].values())[0][:, 0].permute(0, 3, 1, 2).to(device)
states = batch['states'][:, 0].to(device)
true_actions = batch['actions'][:, 0].to(device)

predicted_actions = model(images, states)
mse = F.mse_loss(predicted_actions, true_actions)

total_mse += mse.item() * len(true_actions)
total_samples += len(true_actions)

avg_mse = total_mse / total_samples
print(f'Test MSE: {avg_mse:.6f}')
return avg_mse

モデルの保存と読み込み

def save_model(model, optimizer, epoch, loss, filepath):
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'loss': loss,
}, filepath)

def load_model(model, optimizer, filepath):
checkpoint = torch.load(filepath)
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epoch = checkpoint['epoch']
loss = checkpoint['loss']
return epoch, loss

まとめ

プラットフォームからエクスポートされたHDF5データを適切に活用することで、様々なロボット学習モデルを効果的にトレーニングできます。

  • 模倣学習:専門家のデモンストレーションから直接ポリシーを学習
  • シーケンスモデリング:Transformerを使用して時系列依存関係を処理
  • マルチモーダル学習:視覚、言語、および動作情報を統合
  • 強化学習:オフラインデータまたは初期化データとして活用

重要なのは、具体的なタスクのニーズに応じて適切なモデルアーキテクチャを選択し、HDF5データの豊富なアノテーション情報を十分に活用してモデルの性能を向上させることです。