Diffusion Policy 模型微调
概述
Diffusion Policy 是一种基于扩散模型的视觉运动策略学习方法,将扩散模型的生成能力应用于机器人控制领域。该方法通过学习动作分布的扩散过程,能够生成多样化且高质量的机器人动作序列,在复杂的机器人操作任务中表现出色。
核心特点
- 扩散生成:使用扩散模型生成连续的动作序列
- 多模态动作:能够处理具有多种解决方案的任务
- 高质量输出:生成平滑、自然的机器人动作
- 鲁棒性强:对噪声和扰动具有良好的鲁棒性
- 表达能力强:能够学习复杂的动作分布
先决条件
系统要求
- 操作系统:Linux(推荐 Ubuntu 20.04+)或 macOS
- Python 版本:3.8+
- GPU:NVIDIA GPU(推荐 RTX 3080 或更高),至少 10GB 显存
- 内存:至少 32GB RAM
- 存储空间:至少 50GB 可用空间
环 境准备
1. 安装 LeRobot
# 克隆 LeRobot 仓库
git clone https://github.com/huggingface/lerobot.git
cd lerobot
# 创建虚拟环境(推荐 venv;如你偏好 conda 也可以)
python -m venv .venv
source .venv/bin/activate
python -m pip install -U pip
# 安装依赖
pip install -e .
版本匹配提示:
meta/info.json中codebase_version=v2.1时建议使用lerobot v0.3.x;codebase_version=v3.0时建议使用lerobot v0.4+(含v0.5.x)。这里要区分:v2.1/v3.0是 LeRobotDataset 格式版本,v0.4.0/v0.5.0是 lerobot 代码库版本。
Diffusion Policy 架构
核心组件
- 视觉编码器:提取图像特征
- 状态编码器:处理机器人状态信息
- 条件编码器:融合视觉和状态信息
- 扩散网络:学习动作分布的扩散过程
- 噪声调度器:控制扩散过程的噪声水平
扩散过程
- 前向过程:逐步向动作序列添加噪声
- 反向过程:从噪声中逐步恢复动作序列
- 条件生成:基于观察条件生成动作
- 采样策略:使用 DDPM 或 DDIM 采样
数据准备
LeRobot 格式数据
Diffusion Policy 需要使用 LeRobot 格式的数据集。真实目录会因 v2.1 或 v3.0 而不同,但核心文件通常是下面这些:
your_dataset/
├── meta/
│ ├── info.json
│ ├── tasks.jsonl
│ ├── episodes.jsonl # v2.1
│ └── episodes/ # v3.0
├── data/
│ └── chunk-000/
│ ├── episode_000000.parquet # v2.1
│ └── file-000.parquet # v3.0
└── videos/
└── ...
数据质量要求
- 最少 100 个 episode 用于基本训练
- 推荐 500+ episode 以获得最佳效果
- 动作序列应该平滑连续
- 包含多样化的任务场景
- 高质量的视觉观察数据
微调训练
基本训练命令
# 设置环境变量
export HF_USER="your-huggingface-username"
export CUDA_VISIBLE_DEVICES=0
# 启动 Diffusion Policy 训练
lerobot-train \
--policy.type diffusion \
--policy.pretrained_path lerobot/diffusion_policy \
--dataset.repo_id ${HF_USER}/your_dataset \
--batch_size 64 \
--steps 100000 \
--output_dir outputs/train/diffusion_policy_finetuned \
--job_name diffusion_policy_finetuning \
--policy.device cuda \
--policy.horizon 16 \
--policy.n_action_steps 8 \
--policy.n_obs_steps 2 \
--policy.optimizer_lr 1e-4 \
--policy.optimizer_weight_decay 1e-6 \
--policy.push_to_hub false \
--save_checkpoint true \
--save_freq 10000 \
--wandb.enable true
高级训练配置
多步预测配置
# 针对长序列预测的配置
lerobot-train \
--policy.type diffusion \
--policy.pretrained_path lerobot/diffusion_policy \
--dataset.repo_id ${HF_USER}/your_dataset \
--batch_size 32 \
--steps 150000 \
--output_dir outputs/train/diffusion_policy_long_horizon \
--job_name diffusion_policy_long_horizon \
--policy.device cuda \
--policy.horizon 32 \
--policy.n_action_steps 16 \
--policy.n_obs_steps 4 \
--policy.beta_schedule squaredcos_cap_v2 \
--policy.clip_sample true \
--policy.prediction_type epsilon \
--policy.optimizer_lr 1e-4 \
--policy.scheduler_name cosine \
--policy.scheduler_warmup_steps 5000 \
--policy.push_to_hub false \
--save_checkpoint true \
--wandb.enable true
内存优化配置
# 针对显存较小的 GPU
lerobot-train \
--policy.type diffusion \
--policy.pretrained_path lerobot/diffusion_policy \
--dataset.repo_id ${HF_USER}/your_dataset \
--batch_size 16 \
--steps 200000 \
--output_dir outputs/train/diffusion_policy_memory_opt \
--job_name diffusion_policy_memory_optimized \
--policy.device cuda \
--policy.horizon 16 \
--policy.n_action_steps 8 \
--policy.num_inference_steps 50 \
--policy.optimizer_lr 5e-5 \
--policy.use_amp true \
--num_workers 2 \
--policy.push_to_hub false \
--save_checkpoint true \
--wandb.enable true
参数详细说明
核心参数
| 参数 | 含义 | 推荐值 | 说明 |
|---|---|---|---|
--policy.type | 策略类型 | diffusion | Diffusion Policy 模型类型 |
--policy.pretrained_path | 预训练模型路径 | lerobot/diffusion_policy | LeRobot 官方模型(可选) |
--dataset.repo_id | 数据集仓库 ID | ${HF_USER}/your_dataset | 你的 HuggingFace 数据集 |
--batch_size | 批处理大小 | 64 | 根据显存调 整,RTX 3080 推荐 32-64 |
--steps | 训练步数 | 100000 | 扩散模型通常需要更多训练步数 |
--output_dir | 输出目录 | outputs/train/diffusion_policy_finetuned | 模型保存路径 |
--job_name | 任务名称 | diffusion_policy_finetuning | 用于日志和实验跟踪(可选) |
Diffusion Policy 特定参数
| 参数 | 含义 | 推荐值 | 说明 |
|---|---|---|---|
--policy.horizon | 预测时间范围 | 16 | 预测的动作序列长度 |
--policy.n_action_steps | 执行动作步数 | 8 | 每次执行的动作数量 |
--policy.n_obs_steps | 观察步数 | 2 | 历史观察的数量 |
--policy.num_inference_steps | 推理步数 | 100 | 扩散采样的步数(训练时不生效) |
--policy.beta_schedule | 噪声调度 | squaredcos_cap_v2 | 噪声添加的调度策略 |
--policy.clip_sample | 样本裁剪 | true | 是否裁剪生成的样本 |
--policy.clip_sample_range | 裁剪范围 | 1.0 | 样本裁剪的范围 |
--policy.prediction_type | 预测类型 | epsilon | 预测噪声或样本 |
--policy.num_train_timesteps | 训练时间步 | 100 | 前向扩散的步数 |
网络架构参数
| 参数 | 含义 | 推荐值 | 说明 |
|---|---|---|---|
--policy.vision_backbone | 视觉骨干网络 | resnet18 | 图像特征提取网络 |
--policy.crop_shape | 图像裁剪尺寸 | 84 84 | 输入图像的裁剪尺寸 |
--policy.crop_is_random | 随机裁剪 | true | 训练时是否随机裁剪 |
--policy.use_group_norm | 使用组归一化 | true | 替换批归一化 |
--policy.spatial_softmax_num_keypoints | 空间软最大关键点数 | 32 | 空间软最大层的关键点数 |
--policy.down_dims | 下采样维度 | 512 1024 2048 | U-Net 下采样路径的维度 |
--policy.kernel_size | 卷积核大小 | 5 | 1D 卷积的核大小 |
--policy.n_groups | 组归一化组数 | 8 | GroupNorm 的组数 |
--policy.diffusion_step_embed_dim | 步骤嵌入维度 | 128 | 扩散步骤的嵌入维度 |
训练参数
| 参数 | 含义 | 推荐值 | 说明 |
|---|---|---|---|
--policy.optimizer_lr | 学习率 | 1e-4 | 扩散模型推荐的学习率 |
--policy.optimizer_weight_decay | 权重衰减 | 1e-6 | 正则化参数 |
--policy.optimizer_betas | Adam优化器beta | 0.95 0.999 | Adam 优化器的 beta 参数 |
--policy.optimizer_eps | Adam epsilon | 1e-8 | 数值稳定性参数 |
--policy.scheduler_name | 学习率调度器 | cosine | 余弦退火调度 |
--policy.scheduler_warmup_steps | 预热步数 | 500 | 学习率预热 |
--policy.use_amp | 混合精度 | true | 节省显存 |
--num_workers | 数据加载线程 | 4 | 根据CPU核心数调整 |
--policy.push_to_hub | 推送到Hub | false | 是否上传模型到HuggingFace(需要repo_id) |
--save_checkpoint | 保存检查点 | true | 是否保存训练检查点 |
--save_freq | 保存频率 | 10000 | 检查点保存间隔 |
训练监控和调试
Weights & Biases 集成
# 详细的 W&B 配置
lerobot-train \
--policy.type diffusion \
--dataset.repo_id your-name/your-dataset \
--batch_size 64 \
--steps 100000 \
--policy.push_to_hub false \
--wandb.enable true \
--wandb.project diffusion_policy_experiments \
--wandb.notes "Diffusion Policy training with long horizon" \
# ... 其他参数
关键指标监控
训练过程中需要关注的指标:
- Diffusion Loss:扩散模型的总体损失
- MSE Loss:均方误差损失
- Learning Rate:学习率变化曲线
- Gradient Norm:梯度范数
- Inference Time:推理时间
- Sample Quality:生成样本的质量
训练可视化
# visualization.py
import torch
import matplotlib.pyplot as plt
import numpy as np
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy
def visualize_diffusion_process(model_path, observation):
# 加载模型
policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
policy.eval()
# 生成动作序列的扩散过程
with torch.no_grad():
# 初始噪声
noise = torch.randn(1, policy.horizon, policy.action_dim, device="cuda")
# 扩散采样过程
actions_sequence = []
for t in range(policy.num_inference_steps):
# 预测噪声
noise_pred = policy.unet(noise, t, observation)
# 更新样本
noise = policy.scheduler.step(noise_pred, t, noise).prev_sample
# 保存中间结果
if t % 10 == 0:
actions_sequence.append(noise.cpu().numpy())
final_actions = noise.cpu().numpy()
# 可视化扩散过程
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
for i, actions in enumerate(actions_sequence[:6]):
ax = axes[i//3, i%3]
ax.plot(actions[0, :, 0], label='Action Dim 0')
ax.plot(actions[0, :, 1], label='Action Dim 1')
ax.set_title(f'Diffusion Step {i*10}')
ax.legend()
plt.tight_layout()
plt.savefig('diffusion_process.png')
plt.show()
return final_actions
if __name__ == "__main__":
model_path = "outputs/train/diffusion_policy_finetuned/checkpoints/last"
# 模拟观察
observation = {
"observation.images.cam_high": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 7, device="cuda")
}
actions = visualize_diffusion_process(model_path, observation)
print(f"Generated actions shape: {actions.shape}")
模型评估
离线评估
# offline_evaluation.py
import torch
import numpy as np
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy
from lerobot.datasets.lerobot_dataset import LeRobotDataset
def evaluate_diffusion_policy(model_path, dataset_path):
# 加载模型
policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
policy.eval()
# 加载测试数据集
dataset = LeRobotDataset(dataset_path, split="test")
total_mse_loss = 0
total_mae_loss = 0
num_samples = 0
with torch.no_grad():
for batch in dataset:
# 模型预测
prediction = policy(batch)
# 计算损失
target_actions = batch['action']
predicted_actions = prediction['action']
mse_loss = torch.mean((predicted_actions - target_actions) ** 2)
mae_loss = torch.mean(torch.abs(predicted_actions - target_actions))
total_mse_loss += mse_loss.item()
total_mae_loss += mae_loss.item()
num_samples += 1
avg_mse_loss = total_mse_loss / num_samples
avg_mae_loss = total_mae_loss / num_samples
print(f"Average MSE Loss: {avg_mse_loss:.4f}")
print(f"Average MAE Loss: {avg_mae_loss:.4f}")
return avg_mse_loss, avg_mae_loss
def evaluate_action_diversity(model_path, observation, num_samples=10):
# 评估动作多样性
policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
policy.eval()
actions_list = []
with torch.no_grad():
for _ in range(num_samples):
prediction = policy(observation)
actions_list.append(prediction['action'].cpu().numpy())
actions_array = np.array(actions_list) # [num_samples, horizon, action_dim]
# 计算动作多样性指标
action_std = np.std(actions_array, axis=0) # [horizon, action_dim]
avg_std = np.mean(action_std)
print(f"Average action standard deviation: {avg_std:.4f}")
return avg_std, actions_array
if __name__ == "__main__":
model_path = "outputs/train/diffusion_policy_finetuned/checkpoints/last"
dataset_path = "path/to/your/test/dataset"
# 离线评估
evaluate_diffusion_policy(model_path, dataset_path)
# 多样性评估
observation = {
"observation.images.cam_high": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 7, device="cuda")
}
evaluate_action_diversity(model_path, observation)
在线评估(机器人环境)
# robot_evaluation.py
import torch
import numpy as np
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy
class DiffusionPolicyController:
def __init__(self, model_path, num_inference_steps=50):
self.policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
self.policy.eval()
self.num_inference_steps = num_inference_steps
self.action_queue = []
self.current_obs_history = []
def get_action(self, observations):
# 更新观察历史
self.current_obs_history.append(observations)
if len(self.current_obs_history) > self.policy.n_obs_steps:
self.current_obs_history.pop(0)
# 如果动作队列为空或需要重新规划,生成新的动作序列
if len(self.action_queue) == 0 or self.should_replan():
with torch.no_grad():
# 构建输入
batch = self.prepare_observation_batch()
# 设置推理步数
self.policy.scheduler.set_timesteps(self.num_inference_steps)
# 生成动作序列
prediction = self.policy(batch)
actions = prediction['action'].cpu().numpy()[0] # [horizon, action_dim]
# 更新动作队列
self.action_queue = list(actions[:self.policy.n_action_steps])
# 返回下一个动作
return self.action_queue.pop(0)
def should_replan(self):
# 简单的重新规划策略:当动作队列剩余不足一半时重新规划
return len(self.action_queue) < self.policy.n_action_steps // 2
def prepare_observation_batch(self):
batch = {}
# 处理图像观察
if "observation.images.cam_high" in self.current_obs_history[-1]:
images = []
for obs in self.current_obs_history:
image = obs["observation.images.cam_high"]
image_tensor = self.preprocess_image(image)
images.append(image_tensor)
# 如果历史不足,重复最后一个观察
while len(images) < self.policy.n_obs_steps:
images.insert(0, images[0])
batch["observation.images.cam_high"] = torch.stack(images).unsqueeze(0)
# 处理状态观察
if "observation.state" in self.current_obs_history[-1]:
states = []
for obs in self.current_obs_history:
state = torch.tensor(obs["observation.state"], dtype=torch.float32)
states.append(state)
# 如果历史不足,重复最后一个状态
while len(states) < self.policy.n_obs_steps:
states.insert(0, states[0])
batch["observation.state"] = torch.stack(states).unsqueeze(0)
return batch
def preprocess_image(self, image):
# 图像预处理逻辑
image_tensor = torch.tensor(image).permute(2, 0, 1).float() / 255.0
return image_tensor
# 使用示例
if __name__ == "__main__":
controller = DiffusionPolicyController(
model_path="outputs/train/diffusion_policy_finetuned/checkpoints/last",
num_inference_steps=50
)
# 模拟机器人控制循环
for step in range(100):
# 获取当前观察
observations = {
"observation.images.cam_high": np.random.randint(0, 255, (224, 224, 3)),
"observation.state": np.random.randn(7)
}
# 获取动作
action = controller.get_action(observations)
# 执行动作
print(f"Step {step}: Action = {action}")
# 这里应该将动作发送给实际的机器人
# robot.execute_action(action)
部署和优化
推理加速
# fast_inference.py
import torch
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy
from diffusers import DDIMScheduler
class FastDiffusionInference:
def __init__(self, model_path, num_inference_steps=10):
self.policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
self.policy.eval()
# 使用 DDIM 调度器进行快速采样
self.policy.scheduler = DDIMScheduler.from_config(self.policy.scheduler.config)
self.num_inference_steps = num_inference_steps
# 预热模型
self.warmup()
def warmup(self):
# 使用虚拟数据预热模型
dummy_batch = {
"observation.images.cam_high": torch.randn(1, 2, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 2, 7, device="cuda")
}
with torch.no_grad():
for _ in range(5):
_ = self.predict(dummy_batch)
@torch.no_grad()
def predict(self, observations):
# 设置推理步数
self.policy.scheduler.set_timesteps(self.num_inference_steps)
# 快速推理
prediction = self.policy(observations)
return prediction['action'].cpu().numpy()
if __name__ == "__main__":
fast_inference = FastDiffusionInference(
"outputs/train/diffusion_policy_finetuned/checkpoints/last",
num_inference_steps=10
)
# 测试推理速度
import time
observations = {
"observation.images.cam_high": torch.randn(1, 2, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 2, 7, device="cuda")
}
start_time = time.time()
for _ in range(100):
action = fast_inference.predict(observations)
end_time = time.time()
avg_inference_time = (end_time - start_time) / 100
print(f"Average inference time: {avg_inference_time:.4f} seconds")
print(f"Inference frequency: {1/avg_inference_time:.2f} Hz")
最佳实践
数据收集建议
- 平滑动作:确保演示数据中的动作序列平滑连续
- 多样化场景:收集不同起始状态和目标的数据
- 高质量标注:确保动作标注的准确性
- 充足数据量:扩散模型通常需要更多数据
训练优化建议
- 噪声调度:选择合适的噪声调度策略
- 推理步数:平衡质量和速度,选择合适的推理步数
- 学习率调度:使用余弦退火或阶梯式衰减
- 正则化:适当使用权重衰减
部署优化建议
- 快速采样:使用 DDIM 或其他快速采样方法
- 模型压缩:使用知识蒸馏或量化技术
- 并行推理:利用 GPU 并行能力
- 缓存优化:缓存中间计算结果
常见问题 (FAQ)
Q: Diffusion Policy 与其他策略学习方法相比有什么优势?
A: Diffusion Policy 的主要优势包括:
- 多模 态生成:能够处理具有多种解决方案的任务
- 高质量输出:生成平滑、自然的动作序列
- 鲁棒性强:对噪声和扰动具有良好的鲁棒性
- 表达能力强:能够学习复杂的动作分布
Q: 如何选择合适的推理步数?
A: 推理步数的选择需要平衡质量和速度:
- 高质量:100-1000 步,适合离线评估
- 实时应用:10-50 步,适合在线控制
- 快速原型:5-10 步,适合快速测试
Q: 训练需要多长时间?
A: 训练时间取决于多个因素:
- 数据集大小:500 episodes 约需 12-24 小时(RTX 3080)
- 模型复杂度:更大的模型需要更长时间
- 推理步数:更多步数增加训练时间
- 收敛要求:通常需要 100000-200000 步
Q: 如何提升生成动作的质量?
A: 提升动作质量的方法:
- 增加推理步数:更多步数通常产生更好的结果
- 优化噪声调度:选择合适的噪声添加策略
- 数据质量:确保训练数据的高质量
- 模型架构:使用更大或更深的网络
- 正则化技术:适当的正则化防止过拟合
Q: 如何处理实时性要求?
A: 满足实时性要求的方法:
- 快速采样:使用 DDIM 或 DPM-Solver
- 减少推理步数:在质量和速度间找到平衡
- 模型蒸馏:训练更小的学生模型
- 并行推理:利用多 GPU 或批处理
- 预计算:提前计算部分结果
相关资源
更新日志
- 2024-01: 初始版本发布
- 2024-02: 添加快速采样支持
- 2024-03: 优化内存使用和训练效率
- 2024-04: 添加多样性评估和部署优化