メインコンテンツまでスキップ

Diffusion Policy モデルのファインチューニング

概要

Diffusion Policy は、拡散モデルに基づいた視覚運動ポリシー学習手法であり、拡散モデルの生成能力をロボット制御の分野に応用したものです。この手法は、アクション分布の拡散過程を学習することで、多様かつ高品質なロボットアクションシーケンスを生成することができ、複雑なロボット操作タスクにおいて優れたパフォーマンスを発揮します。

主な特徴

  • 拡散生成:拡散モデルを使用して連続的なアクションシーケンスを生成します。
  • マルチモーダルアクション:複数の解を持つタスクに対応可能です。
  • 高品質な出力:滑らかで自然なロボットアクションを生成します。
  • 高い堅牢性:ノイズや摂動に対して優れた堅牢性を持ちます。
  • 高い表現能力:複雑なアクション分布を学習可能です。

先決条件

システム要件

  • OS:Linux(Ubuntu 20.04+ 推奨)または macOS
  • Python バージョン:3.8+
  • GPU:NVIDIA GPU(RTX 3080 以上推奨)、少なくとも 10GB のビデオメモリ
  • メモリ:少なくとも 32GB の RAM
  • ストレージ:少なくとも 50GB の空き容量

環境準備

1. LeRobot のインストール

# LeRobot リポジトリをクローン
git clone https://github.com/huggingface/lerobot.git
cd lerobot

# 仮想環境の作成(venv 推奨、conda でも可)
python -m venv .venv
source .venv/bin/activate
python -m pip install -U pip

# 依存関係のインストール
pip install -e .

Diffusion Policy アーキテクチャ

コアコンポーネント

  1. 視覚エンコーダー:画像特徴を抽出します。
  2. 状態エンコーダー:ロボットの状態情報を処理します。
  3. 条件エンコーダー:視覚情報と状態情報を融合します。
  4. 拡散ネットワーク:アクション分布の拡散過程を学習します。
  5. ノイズスケジューラー:拡散過程のノイズレベルを制御します。

拡散過程

  1. 順方向過程:アクションシーケンスに段階的にノイズを追加します。
  2. 逆方向過程:ノイズから段階的にアクションシーケンスを復元します。
  3. 条件付き生成:観測条件に基づいてアクションを生成します。
  4. サンプリング戦略:DDPM または DDIM サンプリングを使用します。

データ準備

LeRobot 形式データ

Diffusion Policy は LeRobot 形式のデータセットを必要とします:

your_dataset/
├── data/
│ ├── chunk-001/
│ │ ├── observation.images.cam_high.png
│ │ ├── observation.images.cam_low.png
│ │ ├── observation.state.npy
│ │ ├── action.npy
│ │ └── ...
│ └── chunk-002/
│ └── ...
├── meta.json
├── stats.safetensors
└── videos/
├── episode_000000.mp4
└── ...

データ品質要件

  • 基本的なトレーニングには最低 100 エピソードが必要。
  • 最適な効果を得るには 500 エピソード以上を推奨。
  • アクションシーケンスは滑らかで連続的である必要があります。
  • 多様なタスクシーンを含める必要があります。
  • 高品質な視覚観測データ。

ファインチューニングのトレーニング

基本トレーニングコマンド

# 環境変数の設定
export HF_USER="your-huggingface-username"
export CUDA_VISIBLE_DEVICES=0

# Diffusion Policy トレーニングの開始
lerobot-train \
--policy.type diffusion \
--policy.pretrained_path lerobot/diffusion_policy \
--dataset.repo_id ${HF_USER}/your_dataset \
--batch_size 64 \
--steps 100000 \
--output_dir outputs/train/diffusion_policy_finetuned \
--job_name diffusion_policy_finetuning \
--policy.device cuda \
--policy.horizon 16 \
--policy.n_action_steps 8 \
--policy.n_obs_steps 2 \
--policy.optimizer_lr 1e-4 \
--policy.optimizer_weight_decay 1e-6 \
--policy.push_to_hub false \
--save_checkpoint true \
--save_freq 10000 \
--wandb.enable true

高度なトレーニング設定

長期予測設定

# 長いシーケンス予測向けの設定
lerobot-train \
--policy.type diffusion \
--policy.pretrained_path lerobot/diffusion_policy \
--dataset.repo_id ${HF_USER}/your_dataset \
--batch_size 32 \
--steps 150000 \
--output_dir outputs/train/diffusion_policy_long_horizon \
--job_name diffusion_policy_long_horizon \
--policy.device cuda \
--policy.horizon 32 \
--policy.n_action_steps 16 \
--policy.n_obs_steps 4 \
--policy.beta_schedule squaredcos_cap_v2 \
--policy.clip_sample true \
--policy.prediction_type epsilon \
--policy.optimizer_lr 1e-4 \
--policy.scheduler_name cosine \
--policy.scheduler_warmup_steps 5000 \
--policy.push_to_hub false \
--save_checkpoint true \
--wandb.enable true

メモリ最適化設定

# ビデオメモリの小さい GPU 向け
lerobot-train \
--policy.type diffusion \
--policy.pretrained_path lerobot/diffusion_policy \
--dataset.repo_id ${HF_USER}/your_dataset \
--batch_size 16 \
--steps 200000 \
--output_dir outputs/train/diffusion_policy_memory_opt \
--job_name diffusion_policy_memory_optimized \
--policy.device cuda \
--policy.horizon 16 \
--policy.n_action_steps 8 \
--policy.num_inference_steps 50 \
--policy.optimizer_lr 5e-5 \
--policy.use_amp true \
--num_workers 2 \
--policy.push_to_hub false \
--save_checkpoint true \
--wandb.enable true

パラメータ詳細説明

コアパラメータ

パラメータ意味推奨値説明
--policy.typeポリシータイプdiffusionDiffusion Policy モデルタイプ
--policy.pretrained_path事前学習済みモデルパスlerobot/diffusion_policyLeRobot 公式モデル(任意)
--dataset.repo_idデータセットリポジトリ ID${HF_USER}/your_datasetあなたの HuggingFace データセット
--batch_sizeバッチサイズ64ビデオメモリに応じて調整。RTX 3080 なら 32-64 推奨
--stepsトレーニングステップ数100000拡散モデルは通常、より多くのトレーニングステップを必要とします
--output_dir出力ディレクトリoutputs/train/diffusion_policy_finetunedモデル保存パス
--job_nameジョブ名diffusion_policy_finetuningログおよび実験追跡用(任意)

Diffusion Policy 特定パラメータ

パラメータ意味推奨値説明
--policy.horizon予測時間範囲16予測するアクションシーケンスの長さ
--policy.n_action_steps実行アクションステップ数81回に実行するアクション数
--policy.n_obs_steps観測ステップ数2履歴観測の数
--policy.num_inference_steps推論ステップ数100拡散サンプリングのステップ数(トレーニング時は無効)
--policy.beta_scheduleノイズスケジューリングsquaredcos_cap_v2ノイズ追加のスケジューリング戦略
--policy.clip_sampleサンプルクリッピングtrue生成されたサンプルをクリップするかどうか
--policy.clip_sample_rangeクリッピング範囲1.0サンプルクリッピングの範囲
--policy.prediction_type予測タイプepsilonノイズを予測するかサンプルを予測するか
--policy.num_train_timestepsトレーニング時間ステップ100順方向拡散のステップ数

ネットワークアーキテクチャパラメータ

パラメータ意味推奨値説明
--policy.vision_backbone視覚バックボーンresnet18画像特徴抽出ネットワーク
--policy.crop_shape画像クリップサイズ84 84入力画像のクリップサイズ
--policy.crop_is_randomランダムクリップtrueトレーニング時にランダムクリップを行うか
--policy.use_group_normグループ正規化の使用trueバッチ正規化の代わりに使用
--policy.spatial_softmax_num_keypoints空間ソフトマックスキーポイント数32空間ソフトマックス層のキーポイント数
--policy.down_dimsダウンサンプリング次元512 1024 2048U-Net ダウンサンプリングパスの次元
--policy.kernel_size畳み込み核サイズ51D 畳み込みの核サイズ
--policy.n_groupsグループ正規化グループ数8GroupNorm のグループ数
--policy.diffusion_step_embed_dimステップ埋め込み次元128拡散ステップの埋め込み次元

トレーニングパラメータ

パラメータ意味推奨値説明
--policy.optimizer_lr学習率1e-4拡散モデル推奨の学習率
--policy.optimizer_weight_decay重み減衰1e-6正則化パラメータ
--policy.optimizer_betasAdam ベータ0.95 0.999Adam オプティマイザーのベータパラメータ
--policy.optimizer_epsAdam イプシロン1e-8数値安定性パラメータ
--policy.scheduler_name学習率スケジューラーcosine余弦アニーリングスケジューラー
--policy.scheduler_warmup_stepsウォームアップステップ数500学習率のウォームアップ
--policy.use_amp混合精度trueビデオメモリの節約
--num_workersデータロードスレッド数4CPU コア数に応じて調整
--policy.push_to_hubHub へのプッシュfalseHuggingFace へのアップロード(repo_id が必要)
--save_checkpointチェックポイント保存trueトレーニングチェックポイントの保存
--save_freq保存頻度10000チェックポイント保存間隔

トレーニングの監視とデバッグ

Weights & Biases の統合

# 詳細な W&B 設定
lerobot-train \
--policy.type diffusion \
--dataset.repo_id your-name/your-dataset \
--batch_size 64 \
--steps 100000 \
--policy.push_to_hub false \
--wandb.enable true \
--wandb.project diffusion_policy_experiments \
--wandb.notes "Diffusion Policy training with long horizon" \
# ... 他のパラメータ

主要指標の監視

トレーニング中に注目すべき指標:

  • Diffusion Loss:拡散モデルの全体的な損失。
  • MSE Loss:平均二乗誤差損失。
  • Learning Rate:学習率の変化曲線。
  • Gradient Norm:勾配ノルム。
  • Inference Time:推論時間。
  • Sample Quality:生成サンプルの品質。

トレーニング可视化

# visualization.py
import torch
import matplotlib.pyplot as plt
import numpy as np
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy

def visualize_diffusion_process(model_path, observation):
# モデルのロード
policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
policy.eval()

# アクションシーケンス生成の拡散過程
with torch.no_grad():
# 初期ノイズ
noise = torch.randn(1, policy.horizon, policy.action_dim, device="cuda")

# 拡散サンプリング過程
actions_sequence = []
for t in range(policy.num_inference_steps):
# ノイズ予測
noise_pred = policy.unet(noise, t, observation)

# サンプルの更新
noise = policy.scheduler.step(noise_pred, t, noise).prev_sample

# 中間結果の保存
if t % 10 == 0:
actions_sequence.append(noise.cpu().numpy())

final_actions = noise.cpu().numpy()

# 拡散過程の可視化
fig, axes = plt.subplots(2, 3, figsize=(15, 10))

for i, actions in enumerate(actions_sequence[:6]):
ax = axes[i//3, i%3]
ax.plot(actions[0, :, 0], label='Action Dim 0')
ax.plot(actions[0, :, 1], label='Action Dim 1')
ax.set_title(f'Diffusion Step {i*10}')
ax.legend()

plt.tight_layout()
plt.savefig('diffusion_process.png')
plt.show()

return final_actions

if __name__ == "__main__":
model_path = "outputs/train/diffusion_policy_finetuned/checkpoints/last"

# 模擬観測
observation = {
"observation.images.cam_high": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 7, device="cuda")
}

actions = visualize_diffusion_process(model_path, observation)
print(f"Generated actions shape: {actions.shape}")

モデルの評価

オフライン評価

# offline_evaluation.py
import torch
import numpy as np
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy
from lerobot.datasets.lerobot_dataset import LeRobotDataset

def evaluate_diffusion_policy(model_path, dataset_path):
# モデルのロード
policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
policy.eval()

# テストデータセットのロード
dataset = LeRobotDataset(dataset_path, split="test")

total_mse_loss = 0
total_mae_loss = 0
num_samples = 0

with torch.no_grad():
for batch in dataset:
# モデルの予測
prediction = policy(batch)

# 損失の計算
target_actions = batch['action']
predicted_actions = prediction['action']

mse_loss = torch.mean((predicted_actions - target_actions) ** 2)
mae_loss = torch.mean(torch.abs(predicted_actions - target_actions))

total_mse_loss += mse_loss.item()
total_mae_loss += mae_loss.item()
num_samples += 1

avg_mse_loss = total_mse_loss / num_samples
avg_mae_loss = total_mae_loss / num_samples

print(f"Average MSE Loss: {avg_mse_loss:.4f}")
print(f"Average MAE Loss: {avg_mae_loss:.4f}")

return avg_mse_loss, avg_mae_loss

def evaluate_action_diversity(model_path, observation, num_samples=10):
# アクション多様性の評価
policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
policy.eval()

actions_list = []

with torch.no_grad():
for _ in range(num_samples):
prediction = policy(observation)
actions_list.append(prediction['action'].cpu().numpy())

actions_array = np.array(actions_list) # [num_samples, horizon, action_dim]

# アクション多様性指標の計算
action_std = np.std(actions_array, axis=0) # [horizon, action_dim]
avg_std = np.mean(action_std)

print(f"Average action standard deviation: {avg_std:.4f}")

return avg_std, actions_array

if __name__ == "__main__":
model_path = "outputs/train/diffusion_policy_finetuned/checkpoints/last"
dataset_path = "path/to/your/test/dataset"

# オフライン評価
evaluate_diffusion_policy(model_path, dataset_path)

# 多様性評価
observation = {
"observation.images.cam_high": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 7, device="cuda")
}

evaluate_action_diversity(model_path, observation)

オンライン評価(ロボット環境)

# robot_evaluation.py
import torch
import numpy as np
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy

class DiffusionPolicyController:
def __init__(self, model_path, num_inference_steps=50):
self.policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
self.policy.eval()
self.num_inference_steps = num_inference_steps
self.action_queue = []
self.current_obs_history = []

def get_action(self, observations):
# 観測履歴の更新
self.current_obs_history.append(observations)
if len(self.current_obs_history) > self.policy.n_obs_steps:
self.current_obs_history.pop(0)

# アクションキューが空、または再計画が必要な場合、新しいアクションシーケンスを生成
if len(self.action_queue) == 0 or self.should_replan():
with torch.no_grad():
# 入力の構築
batch = self.prepare_observation_batch()

# 推論ステップ数の設定
self.policy.scheduler.set_timesteps(self.num_inference_steps)

# アクションシーケンスの生成
prediction = self.policy(batch)
actions = prediction['action'].cpu().numpy()[0] # [horizon, action_dim]

# アクションキューの更新
self.action_queue = list(actions[:self.policy.n_action_steps])

# 次のアクションを返す
return self.action_queue.pop(0)

def should_replan(self):
# シンプルな再計画戦略:アクションキューの残りが半分未満になったら再計画
return len(self.action_queue) < self.policy.n_action_steps // 2

def prepare_observation_batch(self):
batch = {}

# 画像観測の処理
if "observation.images.cam_high" in self.current_obs_history[-1]:
images = []
for obs in self.current_obs_history:
image = obs["observation.images.cam_high"]
image_tensor = self.preprocess_image(image)
images.append(image_tensor)

# 履歴が不足している場合、最後の観測を繰り返す
while len(images) < self.policy.n_obs_steps:
images.insert(0, images[0])

batch["observation.images.cam_high"] = torch.stack(images).unsqueeze(0)

# 状態観測の処理
if "observation.state" in self.current_obs_history[-1]:
states = []
for obs in self.current_obs_history:
state = torch.tensor(obs["observation.state"], dtype=torch.float32)
states.append(state)

# 履歴が不足している場合、最後の状態を繰り返す
while len(states) < self.policy.n_obs_steps:
states.insert(0, states[0])

batch["observation.state"] = torch.stack(states).unsqueeze(0)

return batch

def preprocess_image(self, image):
# 画像プリプロセスロジック
image_tensor = torch.tensor(image).permute(2, 0, 1).float() / 255.0
return image_tensor

# 使用例
if __name__ == "__main__":
controller = DiffusionPolicyController(
model_path="outputs/train/diffusion_policy_finetuned/checkpoints/last",
num_inference_steps=50
)

# ロボット制御ループのシミュレーション
for step in range(100):
# 現在の観測を取得
observations = {
"observation.images.cam_high": np.random.randint(0, 255, (224, 224, 3)),
"observation.state": np.random.randn(7)
}

# アクションを取得
action = controller.get_action(observations)

# アクションを実行
print(f"Step {step}: Action = {action}")

# ここで実際のアクションをロボットに送信する必要があります
# robot.execute_action(action)

デプロイと最適化

推論加速

# fast_inference.py
import torch
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy
from diffusers import DDIMScheduler

class FastDiffusionInference:
def __init__(self, model_path, num_inference_steps=10):
self.policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
self.policy.eval()

# 高速サンプリングのために DDIM スケジューラーを使用
self.policy.scheduler = DDIMScheduler.from_config(self.policy.scheduler.config)
self.num_inference_steps = num_inference_steps

# モデルのウォームアップ
self.warmup()

def warmup(self):
# ダミーデータを使用してモデルをウォームアップ
dummy_batch = {
"observation.images.cam_high": torch.randn(1, 2, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 2, 7, device="cuda")
}

with torch.no_grad():
for _ in range(5):
_ = self.predict(dummy_batch)

@torch.no_grad()
def predict(self, observations):
# 推論ステップ数の設定
self.policy.scheduler.set_timesteps(self.num_inference_steps)

# 高速推論
prediction = self.policy(observations)
return prediction['action'].cpu().numpy()

if __name__ == "__main__":
fast_inference = FastDiffusionInference(
"outputs/train/diffusion_policy_finetuned/checkpoints/last",
num_inference_steps=10
)

# 推論速度のテスト
import time

observations = {
"observation.images.cam_high": torch.randn(1, 2, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 2, 7, device="cuda")
}

start_time = time.time()
for _ in range(100):
action = fast_inference.predict(observations)
end_time = time.time()

avg_inference_time = (end_time - start_time) / 100
print(f"Average inference time: {avg_inference_time:.4f} seconds")
print(f"Inference frequency: {1/avg_inference_time:.2f} Hz")

ベストプラクティス

データ収集のアドバイス

  1. 滑らかなアクション:デモンストレーションデータのアクションシーケンスが滑らかで連続的であることを確認します。
  2. 多様なシーン:異なる初期状態や目標のデータを収集します。
  3. 高品質なアノテーション:アクションアノテーションの正確性を確保します。
  4. 十分なデータ量:拡散モデルは通常、より多くのデータを必要とします。

トレーニング最適化のアドバイス

  1. ノイズスケジューリング:適切なノイズスケジューリング戦略を選択します。
  2. 推論ステップ数:品質と速度のバランスを考慮し、適切な推論ステップ数を選択します。
  3. 学習率スケジューラー:余弦アニーリングまたはステップ減衰を使用します。
  4. 正則化:重み減衰を適切に使用します。

デプロイ最適化のアドバイス

  1. 高速サンプリング:DDIM やその他の高速サンプリング手法を使用します。
  2. モデル圧縮:知識蒸馏や量子化技術を使用します。
  3. 並列推論:GPU の並列能力を利用します。
  4. キャッシュ最適化:中間計算結果をキャッシュします。

よくある質問 (FAQ)

Q: Diffusion Policy は他のポリシー学習手法と比較してどのような利点がありますか?

A: Diffusion Policy の主な利点は以下の通りです:

  • マルチモーダル生成:複数の解を持つタスクに対応可能です。
  • 高品質な出力:滑らかで自然なアクションシーケンスを生成します。
  • 高い堅牢性:ノイズや摂動に対して優れた堅牢性を持ちます。
  • 高い表現能力:複雑なアクション分布を学習可能です。

Q: 適切な推論ステップ数をどのように選択すればよいですか?

A: 推論ステップ数の選択は品質と速度のバランスです:

  • 高品質:100-1000 ステップ。オフライン評価に適しています。
  • リアルタイム応用:10-50 ステップ。オンライン制御に適しています。
  • 高速プロトタイプ:5-10 ステップ。迅速なテストに適しています。

Q: トレーニングにはどのくらい時間がかかりますか?

A: トレーニング時間は複数の要因に依存します:

  • データセットサイズ:500 エピソードで約 12-24 時間(RTX 3080)。
  • モデルの複雑さ:モデルが大きいほど時間がかかります。
  • 推論ステップ数:ステップ数が多いほどトレーニング時間が増えます。
  • 収束要件:通常 100,000-200,000 ステップ必要です。

Q: 生成されるアクションの品質をどのように向上させますか?

A: アクションの品質を向上させる方法:

  • 推論ステップ数を増やす:ステップ数が多いほど、通常より良い結果が得られます。
  • ノイズスケジューリングの最適化:適切なノイズ追加戦略を選択します。
  • データ品質:トレーニングデータの高品質化を確保します。
  • モデルアーキテクチャ:より大きく深いネットワークを使用します。
  • 正則化技術:過学習を防ぐために適切な正則化を行います。

Q: リアルタイム性の要求をどのように満たしますか?

A: リアルタイム性の要求を満たす方法:

  • 高速サンプリング:DDIM や DPM-Solver を使用します。
  • 推論ステップ数の削減:品質と速度の間でバランスを見つけます。
  • モデル蒸留:より小さな学生モデルをトレーニングします。
  • 並列推論:マルチ GPU やバッチ処理を利用します。
  • 事前計算:一部の結果を事前に計算しておきます。

関連リソース

更新ログ

  • 2024-01: 初版リリース
  • 2024-02: 高速サンプリングサポートの追加
  • 2024-03: メモリ使用量とトレーニング効率の最適化
  • 2024-04: 多様性評価とデプロイ最適化の追加