メインコンテンツまでスキップ

Diffusion Policyモデルのファインチューニング

概要

Diffusion Policyは、拡散モデルに基づくビジュオモーター政策学習方法であり、拡散モデルの生成能力をロボット制御分野に応用します。この方法は、アクション分布の拡散プロセスを学習することで、多様で高品質なロボットアクションシーケンスを生成し、複雑なロボット操作タスクで優れた性能を発揮します。

コア機能

  • 拡散生成:拡散モデルを使用して連続的なアクションシーケンスを生成
  • マルチモーダルアクション:複数の解決策を持つタスクを処理可能
  • 高品質な出力:滑らかで自然なロボットアクションを生成
  • 強いロバスト性:ノイズや外乱に対して良好なロバスト性
  • 強い表現力:複雑なアクション分布を学習可能

前提条件

システム要件

  • オペレーティングシステム:Linux(Ubuntu 20.04+推奨)またはmacOS
  • Pythonバージョン:3.8+
  • GPU:NVIDIA GPU(RTX 3080以上推奨)、最低10GB VRAM
  • メモリ:最低32GB RAM
  • ストレージ容量:最低50GB利用可能容量

環境準備

1. LeRobotのインストール

# LeRobotリポジトリをクローン
git clone https://github.com/huggingface/lerobot.git
cd lerobot

# 仮想環境を作成
conda create -n lerobot python=3.10
conda activate lerobot

# 依存関係をインストール
pip install -e .

2. Diffusion Policy固有の依存関係をインストール

# 拡散モデル関連の依存関係をインストール
pip install diffusers
pip install accelerate
pip install transformers
pip install einops
pip install wandb

# 数値計算ライブラリをインストール
pip install scipy
pip install scikit-learn

# Weights & Biasesにログイン(オプション)
wandb login

Diffusion Policyアーキテクチャ

コアコンポーネント

  1. ビジョンエンコーダ:画像特徴を抽出
  2. 状態エンコーダ:ロボット状態情報を処理
  3. 条件エンコーダ:ビジョンと状態情報を融合
  4. 拡散ネットワーク:アクション分布の拡散プロセスを学習
  5. ノイズスケジューラ:拡散プロセスのノイズレベルを制御

拡散プロセス

  1. 順方向プロセス:アクションシーケンスに段階的にノイズを追加
  2. 逆方向プロセス:ノイズから段階的にアクションシーケンスを回復
  3. 条件付き生成:観測条件に基づいてアクションを生成
  4. サンプリング戦略:DDPMまたはDDIMサンプリングを使用

データ準備

LeRobotフォーマットデータ

Diffusion PolicyはLeRobotフォーマットのデータセットを使用する必要があります:

your_dataset/
├── data/
│ ├── chunk-001/
│ │ ├── observation.images.cam_high.png
│ │ ├── observation.images.cam_low.png
│ │ ├── observation.state.npy
│ │ ├── action.npy
│ │ └── ...
│ └── chunk-002/
│ └── ...
├── meta.json
├── stats.safetensors
└── videos/
├── episode_000000.mp4
└── ...

データ品質要件

  • 最低100エピソード基本トレーニング用
  • 500+エピソード推奨最適な結果を得るため
  • アクションシーケンスは滑らかで連続的であるべき
  • 多様なタスクシナリオを含む
  • 高品質な視覚観測データ

ファインチューニングトレーニング

基本トレーニングコマンド

# 環境変数を設定
export HF_USER="your-huggingface-username"
export CUDA_VISIBLE_DEVICES=0

# Diffusion Policyトレーニングを開始
lerobot-train \
--policy.type diffusion \
--policy.pretrained_path lerobot/diffusion_policy \
--dataset.repo_id ${HF_USER}/your_dataset \
--batch_size 64 \
--steps 100000 \
--output_dir outputs/train/diffusion_policy_finetuned \
--job_name diffusion_policy_finetuning \
--policy.device cuda \
--policy.horizon 16 \
--policy.n_action_steps 8 \
--policy.n_obs_steps 2 \
--policy.num_inference_steps 100 \
--policy.optimizer_lr 1e-4 \
--policy.optimizer_weight_decay 1e-6 \
--policy.push_to_hub false \
--save_checkpoint true \
--save_freq 10000 \
--wandb.enable true

高度なトレーニング構成

マルチステップ予測構成

# 長いシーケンス予測用の構成
lerobot-train \
--policy.type diffusion \
--policy.pretrained_path lerobot/diffusion_policy \
--dataset.repo_id ${HF_USER}/your_dataset \
--batch_size 32 \
--steps 150000 \
--output_dir outputs/train/diffusion_policy_long_horizon \
--job_name diffusion_policy_long_horizon \
--policy.device cuda \
--policy.horizon 32 \
--policy.n_action_steps 16 \
--policy.n_obs_steps 4 \
--policy.num_inference_steps 100 \
--policy.beta_schedule squaredcos_cap_v2 \
--policy.clip_sample true \
--policy.prediction_type epsilon \
--policy.optimizer_lr 1e-4 \
--policy.scheduler_name cosine \
--policy.scheduler_warmup_steps 5000 \
--policy.push_to_hub false \
--save_checkpoint true \
--wandb.enable true

メモリ最適化構成

# VRAMが小さいGPU向け
lerobot-train \
--policy.type diffusion \
--policy.pretrained_path lerobot/diffusion_policy \
--dataset.repo_id ${HF_USER}/your_dataset \
--batch_size 16 \
--steps 200000 \
--output_dir outputs/train/diffusion_policy_memory_opt \
--job_name diffusion_policy_memory_optimized \
--policy.device cuda \
--policy.horizon 16 \
--policy.n_action_steps 8 \
--policy.num_inference_steps 50 \
--policy.optimizer_lr 5e-5 \
--policy.use_amp true \
--num_workers 2 \
--policy.push_to_hub false \
--save_checkpoint true \
--wandb.enable true

パラメータ詳細説明

コアパラメータ

パラメータ意味推奨値説明
--policy.typeポリシータイプdiffusionDiffusion Policyモデルタイプ
--policy.pretrained_path事前学習済みモデルパスlerobot/diffusion_policyLeRobot公式モデル(オプション)
--dataset.repo_idデータセットリポジトリID${HF_USER}/datasetあなたのHuggingFaceデータセット
--batch_sizeバッチサイズ64VRAMに応じて調整、RTX 3080は32-64推奨
--stepsトレーニングステップ数100000拡散モデルは通常より多くのトレーニングステップが必要
--output_dir出力ディレクトリoutputs/train/diffusion_policy_finetunedモデル保存パス
--job_nameジョブ名diffusion_policy_finetuningログと実験追跡用(オプション)

Diffusion Policy固有パラメータ

パラメータ意味推奨値説明
--policy.horizon予測ホライズン16予測されるアクションシーケンスの長さ
--policy.n_action_steps実行アクションステップ8毎回実行されるアクション数
--policy.n_obs_steps観測ステップ2履歴観測の数
--policy.num_inference_steps推論ステップ100拡散サンプリングのステップ数(トレーニング時は無効)
--policy.beta_scheduleノイズスケジュールsquaredcos_cap_v2ノイズ追加のスケジューリング戦略
--policy.clip_sampleサンプルクリッピングtrue生成されたサンプルをクリップするか
--policy.clip_sample_rangeクリッピング範囲1.0サンプルクリッピングの範囲
--policy.prediction_type予測タイプepsilonノイズまたはサンプルを予測
--policy.num_train_timestepsトレーニングタイムステップ100順方向拡散のステップ数

ネットワークアーキテクチャパラメータ

パラメータ意味推奨値説明
--policy.vision_backboneビジョンバックボーンresnet18画像特徴抽出ネットワーク
--policy.crop_shape画像クロップサイズ84 84入力画像のクロップサイズ
--policy.crop_is_randomランダムクロップtrueトレーニング時にランダムクロップするか
--policy.use_group_normグループ正規化を使用trueバッチ正規化を置き換え
--policy.spatial_softmax_num_keypoints空間ソフトマックスキーポイント数32空間ソフトマックス層のキーポイント数
--policy.down_dimsダウンサンプリング次元512 1024 2048U-Netダウンサンプリングパスの次元
--policy.kernel_size畳み込みカーネルサイズ51D畳み込みのカーネルサイズ
--policy.n_groupsグループ正規化グループ数8GroupNormのグループ数
--policy.diffusion_step_embed_dimステップ埋め込み次元128拡散ステップの埋め込み次元

トレーニングパラメータ

パラメータ意味推奨値説明
--policy.optimizer_lr学習率1e-4拡散モデルの推奨学習率
--policy.optimizer_weight_decay重み減衰1e-6正則化パラメータ
--policy.optimizer_betasAdamオプティマイザbeta0.95 0.999Adamオプティマイザのbetaパラメータ
--policy.optimizer_epsAdam epsilon1e-8数値安定性パラメータ
--policy.scheduler_name学習率スケジューラcosineコサインアニーリングスケジュール
--policy.scheduler_warmup_stepsウォームアップステップ500学習率ウォームアップ
--policy.use_amp混合精度trueVRAMを節約
--num_workersデータローディングスレッド数4CPUコア数に応じて調整
--policy.push_to_hubHubへプッシュfalseHuggingFaceへモデルをアップロードするか(repo_idが必要)
--save_checkpointチェックポイント保存trueトレーニングチェックポイントを保存するか
--save_freq保存頻度10000チェックポイント保存間隔

トレーニング監視とデバッグ

Weights & Biases統合

# 詳細なW&B構成
lerobot-train \
--policy.type diffusion \
--dataset.repo_id your-name/your-dataset \
--batch_size 64 \
--steps 100000 \
--policy.push_to_hub false \
--wandb.enable true \
--wandb.project diffusion_policy_experiments \
--wandb.notes "Diffusion Policy training with long horizon" \
# ... その他のパラメータ

主要メトリクス監視

トレーニング中に注目すべきメトリクス:

  • Diffusion Loss:拡散モデルの全体的な損失
  • MSE Loss:平均二乗誤差損失
  • Learning Rate:学習率変化曲線
  • Gradient Norm:勾配ノルム
  • Inference Time:推論時間
  • Sample Quality:生成されたサンプルの品質

トレーニング可視化

# visualization.py
import torch
import matplotlib.pyplot as plt
import numpy as np
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy

def visualize_diffusion_process(model_path, observation):
# モデルを読み込み
policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
policy.eval()

# アクションシーケンスの拡散プロセスを生成
with torch.no_grad():
# 初期ノイズ
noise = torch.randn(1, policy.horizon, policy.action_dim, device="cuda")

# 拡散サンプリングプロセス
actions_sequence = []
for t in range(policy.num_inference_steps):
# ノイズを予測
noise_pred = policy.unet(noise, t, observation)

# サンプルを更新
noise = policy.scheduler.step(noise_pred, t, noise).prev_sample

# 中間結果を保存
if t % 10 == 0:
actions_sequence.append(noise.cpu().numpy())

final_actions = noise.cpu().numpy()

# 拡散プロセスを可視化
fig, axes = plt.subplots(2, 3, figsize=(15, 10))

for i, actions in enumerate(actions_sequence[:6]):
ax = axes[i//3, i%3]
ax.plot(actions[0, :, 0], label='Action Dim 0')
ax.plot(actions[0, :, 1], label='Action Dim 1')
ax.set_title(f'Diffusion Step {i*10}')
ax.legend()

plt.tight_layout()
plt.savefig('diffusion_process.png')
plt.show()

return final_actions

if __name__ == "__main__":
model_path = "outputs/train/diffusion_policy_finetuned/checkpoints/last"

# 観測をシミュレート
observation = {
"observation.images.cam_high": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 7, device="cuda")
}

actions = visualize_diffusion_process(model_path, observation)
print(f"Generated actions shape: {actions.shape}")

モデル評価

オフライン評価

# offline_evaluation.py
import torch
import numpy as np
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy
from lerobot.datasets.lerobot_dataset import LeRobotDataset

def evaluate_diffusion_policy(model_path, dataset_path):
# モデルを読み込み
policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
policy.eval()

# テストデータセットを読み込み
dataset = LeRobotDataset(dataset_path, split="test")

total_mse_loss = 0
total_mae_loss = 0
num_samples = 0

with torch.no_grad():
for batch in dataset:
# モデル予測
prediction = policy(batch)

# 損失を計算
target_actions = batch['action']
predicted_actions = prediction['action']

mse_loss = torch.mean((predicted_actions - target_actions) ** 2)
mae_loss = torch.mean(torch.abs(predicted_actions - target_actions))

total_mse_loss += mse_loss.item()
total_mae_loss += mae_loss.item()
num_samples += 1

avg_mse_loss = total_mse_loss / num_samples
avg_mae_loss = total_mae_loss / num_samples

print(f"Average MSE Loss: {avg_mse_loss:.4f}")
print(f"Average MAE Loss: {avg_mae_loss:.4f}")

return avg_mse_loss, avg_mae_loss

def evaluate_action_diversity(model_path, observation, num_samples=10):
# アクション多様性を評価
policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
policy.eval()

actions_list = []

with torch.no_grad():
for _ in range(num_samples):
prediction = policy(observation)
actions_list.append(prediction['action'].cpu().numpy())

actions_array = np.array(actions_list) # [num_samples, horizon, action_dim]

# アクション多様性メトリクスを計算
action_std = np.std(actions_array, axis=0) # [horizon, action_dim]
avg_std = np.mean(action_std)

print(f"Average action standard deviation: {avg_std:.4f}")

return avg_std, actions_array

if __name__ == "__main__":
model_path = "outputs/train/diffusion_policy_finetuned/checkpoints/last"
dataset_path = "path/to/your/test/dataset"

# オフライン評価
evaluate_diffusion_policy(model_path, dataset_path)

# 多様性評価
observation = {
"observation.images.cam_high": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 7, device="cuda")
}

evaluate_action_diversity(model_path, observation)

オンライン評価(ロボット環境)

# robot_evaluation.py
import torch
import numpy as np
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy

class DiffusionPolicyController:
def __init__(self, model_path, num_inference_steps=50):
self.policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
self.policy.eval()
self.num_inference_steps = num_inference_steps
self.action_queue = []
self.current_obs_history = []

def get_action(self, observations):
# 観測履歴を更新
self.current_obs_history.append(observations)
if len(self.current_obs_history) > self.policy.n_obs_steps:
self.current_obs_history.pop(0)

# アクションキューが空または再計画が必要な場合、新しいアクションシーケンスを生成
if len(self.action_queue) == 0 or self.should_replan():
with torch.no_grad():
# 入力を構築
batch = self.prepare_observation_batch()

# 推論ステップを設定
self.policy.scheduler.set_timesteps(self.num_inference_steps)

# アクションシーケンスを生成
prediction = self.policy(batch)
actions = prediction['action'].cpu().numpy()[0] # [horizon, action_dim]

# アクションキューを更新
self.action_queue = list(actions[:self.policy.n_action_steps])

# 次のアクションを返す
return self.action_queue.pop(0)

def should_replan(self):
# 簡単な再計画戦略:アクションキューの残りが半分未満の場合に再計画
return len(self.action_queue) < self.policy.n_action_steps // 2

def prepare_observation_batch(self):
batch = {}

# 画像観測を処理
if "observation.images.cam_high" in self.current_obs_history[-1]:
images = []
for obs in self.current_obs_history:
image = obs["observation.images.cam_high"]
image_tensor = self.preprocess_image(image)
images.append(image_tensor)

# 履歴が不足している場合、最後の観測を繰り返す
while len(images) < self.policy.n_obs_steps:
images.insert(0, images[0])

batch["observation.images.cam_high"] = torch.stack(images).unsqueeze(0)

# 状態観測を処理
if "observation.state" in self.current_obs_history[-1]:
states = []
for obs in self.current_obs_history:
state = torch.tensor(obs["observation.state"], dtype=torch.float32)
states.append(state)

# 履歴が不足している場合、最後の状態を繰り返す
while len(states) < self.policy.n_obs_steps:
states.insert(0, states[0])

batch["observation.state"] = torch.stack(states).unsqueeze(0)

return batch

def preprocess_image(self, image):
# 画像前処理ロジック
image_tensor = torch.tensor(image).permute(2, 0, 1).float() / 255.0
return image_tensor

# 使用例
if __name__ == "__main__":
controller = DiffusionPolicyController(
model_path="outputs/train/diffusion_policy_finetuned/checkpoints/last",
num_inference_steps=50
)

# ロボット制御ループをシミュレート
for step in range(100):
# 現在の観測を取得
observations = {
"observation.images.cam_high": np.random.randint(0, 255, (224, 224, 3)),
"observation.state": np.random.randn(7)
}

# アクションを取得
action = controller.get_action(observations)

# アクションを実行
print(f"Step {step}: Action = {action}")

# 実際のロボットにアクションを送信すべき
# robot.execute_action(action)

デプロイと最適化

推論加速

# fast_inference.py
import torch
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy
from diffusers import DDIMScheduler

class FastDiffusionInference:
def __init__(self, model_path, num_inference_steps=10):
self.policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
self.policy.eval()

# 高速サンプリングのためにDDIMスケジューラを使用
self.policy.scheduler = DDIMScheduler.from_config(self.policy.scheduler.config)
self.num_inference_steps = num_inference_steps

# モデルをウォームアップ
self.warmup()

def warmup(self):
# ダミーデータでモデルをウォームアップ
dummy_batch = {
"observation.images.cam_high": torch.randn(1, 2, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 2, 7, device="cuda")
}

with torch.no_grad():
for _ in range(5):
_ = self.predict(dummy_batch)

@torch.no_grad()
def predict(self, observations):
# 推論ステップを設定
self.policy.scheduler.set_timesteps(self.num_inference_steps)

# 高速推論
prediction = self.policy(observations)
return prediction['action'].cpu().numpy()

if __name__ == "__main__":
fast_inference = FastDiffusionInference(
"outputs/train/diffusion_policy_finetuned/checkpoints/last",
num_inference_steps=10
)

# 推論速度をテスト
import time

observations = {
"observation.images.cam_high": torch.randn(1, 2, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 2, 7, device="cuda")
}

start_time = time.time()
for _ in range(100):
action = fast_inference.predict(observations)
end_time = time.time()

avg_inference_time = (end_time - start_time) / 100
print(f"Average inference time: {avg_inference_time:.4f} seconds")
print(f"Inference frequency: {1/avg_inference_time:.2f} Hz")

ベストプラクティス

データ収集の推奨事項

  1. 滑らかなアクション:デモンストレーションデータ内のアクションシーケンスが滑らかで連続的であることを確保
  2. 多様なシナリオ:異なる開始状態と目標のデータを収集
  3. 高品質なアノテーション:アクションアノテーションの正確性を確保
  4. 十分なデータ量:拡散モデルは通常より多くのデータが必要

トレーニング最適化の推奨事項

  1. ノイズスケジューリング:適切なノイズスケジューリング戦略を選択
  2. 推論ステップ:品質と速度のバランスを取り、適切な推論ステップを選択
  3. 学習率スケジューリング:コサインアニーリングまたはステップ減衰を使用
  4. 正則化:適切に重み減衰を使用

デプロイ最適化の推奨事項

  1. 高速サンプリング:DDIMまたは他の高速サンプリング方法を使用
  2. モデル圧縮:知識蒸留または量子化技術を使用
  3. 並列推論:GPU並列能力を活用
  4. キャッシュ最適化:中間計算結果をキャッシュ

よくある質問(FAQ)

Q: Diffusion Policyは他のポリシー学習方法と比較してどのような利点がありますか?

A: Diffusion Policyの主な利点には以下が含まれます:

  • マルチモーダル生成:複数の解決策を持つタスクを処理可能
  • 高品質な出力:滑らかで自然なアクションシーケンスを生成
  • 強いロバスト性:ノイズや外乱に対して良好なロバスト性
  • 強い表現力:複雑なアクション分布を学習可能

Q: 適切な推論ステップ数を選択するには?

A: 推論ステップの選択は品質と速度のバランスを取る必要があります:

  • 高品質:100-1000ステップ、オフライン評価に適している
  • リアルタイムアプリケーション:10-50ステップ、オンライン制御に適している
  • 高速プロトタイピング:5-10ステップ、クイックテストに適している

Q: トレーニングにはどのくらい時間がかかりますか?

A: トレーニング時間は複数の要因に依存します:

  • データセットサイズ:500エピソードは約12-24時間(RTX 3080)
  • モデルの複雑さ:より大きなモデルはより多くの時間が必要
  • 推論ステップ:より多くのステップはトレーニング時間を増加
  • 収束要件:通常100000-200000ステップが必要

Q: 生成されたアクションの品質を向上させるには?

A: アクション品質を向上させる方法:

  • 推論ステップを増やす:より多くのステップは通常より良い結果を生成
  • ノイズスケジューリングを最適化:適切なノイズ追加戦略を選択
  • データ品質:トレーニングデータの高品質を確保
  • モデルアーキテクチャ:より大きいまたはより深いネットワークを使用
  • 正則化技術:適切な正則化は過学習を防ぐ

Q: リアルタイム要件を処理するには?

A: リアルタイム要件を満たす方法:

  • 高速サンプリング:DDIMまたはDPM-Solverを使用
  • 推論ステップを削減:品質と速度のバランスを見つける
  • モデル蒸留:より小さな学生モデルをトレーニング
  • 並列推論:マルチGPUまたはバッチ処理を活用
  • 事前計算:部分的な結果を事前計算

関連リソース

更新ログ

  • 2024-01: 初版リリース
  • 2024-02: 高速サンプリングサポートを追加
  • 2024-03: メモリ使用量とトレーニング効率を最適化
  • 2024-04: 多様性評価とデプロイ最適化を追加