Дообучение модели Diffusion Policy
Обзор
Diffusion Policy — это метод обучения визуально-моторных стратегий (visuomotor policies), основанный на диффузионных моделях. Он переносит генеративные возможности диффузионных моделей в сферу управления роботами. Обучаясь процессу диффузии распределений действий, этот метод позволяет генерировать разнообразные и высококачественные последовательности действий робота, отлично справляясь со сложными задачами манипулирования.
Основные характеристики
- Диффузионная генерация: использование диффузионных моделей для создания непрерывных последовательностей действий.
- Мультимодальность: способность решать задачи, имеющие несколько вариантов выполнения.
- Высокое качество: генерация плавных и естественных движений робота.
- Устойчивость: хорошая сопротивляемость шумам и внешним возмущениям.
- Выразительность: способность обучаться сложным распределениям действий.
Предварительные требования
Системные требования
- ОС: Linux (рекоме ндуется Ubuntu 20.04+) или macOS.
- Версия Python: 3.8+.
- GPU: NVIDIA GPU (рекомендуется RTX 3080 или выше) с минимум 10 ГБ видеопамяти.
- ОЗУ: минимум 32 ГБ.
- Место на диске: минимум 50 ГБ свободного пространства.
Подготовка окружения
1. Установка LeRobot
# Клонирование репозитория LeRobot
git clone https://github.com/huggingface/lerobot.git
cd lerobot
# Создание виртуального окружения (рекомендуется venv или conda)
python -m venv .venv
source .venv/bin/activate
python -m pip install -U pip
# Установка зависимостей
pip install -e .
Архитектура Diffusion Policy
Основные компоненты
- Визуальный энкодер: извлекает признаки из изображений.
- Энкодер состояния: обрабатывает информацию о состоянии робота.
- Энкоде р условий (Condition Encoder): объединяет визуальные данные и данные о состоянии.
- Диффузионная сеть: обучается процессу диффузии распределений действий.
- Планировщик шума (Noise Scheduler): контролирует уровень шума в процессе диффузии.
Процесс диффузии
- Прямой процесс: постепенное добавление шума в последовательность действий.
- Обратный процесс: постепенное восстановление последовательности действий из шума.
- Условная генерация: создание действий на основе условий наблюдения.
- Стратегия выборки (Sampling): использование DDPM или DDIM.
Подготовка данных
Данные в формате LeRobot
Diffusion Policy требует наборы данных в формате LeRobot:
your_dataset/
├── data/
│ ├── chunk-001/
│ │ ├── observation.images.cam_high.png
│ │ ├── observation.images.cam_low.png
│ │ ├── observation.state.npy
│ │ ├── action.npy
│ │ └── ...
│ └── chunk-002/
│ └── ...
├── meta.json
├ ── stats.safetensors
└── videos/
├── episode_000000.mp4
└── ...
Требования к качеству данных
- Минимум 100 эпизодов для базового обучения.
- Рекомендуется более 500 эпизодов для оптимальных результатов.
- Последовательности действий должны быть плавными и непрерывными.
- Набор данных должен включать разнообразные сценарии задач.
- Высокое качество визуальных наблюдений.
Обучение (Fine-tuning)
Базовая команда обучения
# Установка переменных окружения
export HF_USER="your-huggingface-username"
export CUDA_VISIBLE_DEVICES=0
# Запуск обучения Diffusion Policy
lerobot-train \
--policy.type diffusion \
--policy.pretrained_path lerobot/diffusion_policy \
--dataset.repo_id ${HF_USER}/your_dataset \
--batch_size 64 \
--steps 100000 \
--output_dir outputs/train/diffusion_policy_finetuned \
--job_name diffusion_policy_finetuning \
--policy.device cuda \
--policy.horizon 16 \
--policy.n_action_steps 8 \
--policy.n_obs_steps 2 \
--policy.optimizer_lr 1e-4 \
--policy.optimizer_weight_decay 1e-6 \
--policy.push_to_hub false \
--save_checkpoint true \
--save_freq 10000 \
--wandb.enable true
Расширенные настройки обучения
Конфигурация для длинных горизонтов (Long-Horizon)
# Конфигурация для предсказания длинных последовательностей
lerobot-train \
--policy.type diffusion \
--policy.pretrained_path lerobot/diffusion_policy \
--dataset.repo_id ${HF_USER}/your_dataset \
--batch_size 32 \
--steps 150000 \
--output_dir outputs/train/diffusion_policy_long_horizon \
--job_name diffusion_policy_long_horizon \
--policy.device cuda \
--policy.horizon 32 \
--policy.n_action_steps 16 \
--policy.n_obs_steps 4 \
--policy.beta_schedule squaredcos_cap_v2 \
--policy.clip_sample true \
--policy.prediction_type epsilon \
--policy.optimizer_lr 1e-4 \
--policy.scheduler_name cosine \
--policy.scheduler_warmup_steps 5000 \
--policy.push_to_hub false \
--save_checkpoint true \
--wandb.enable true
Настройка для оптимизации памяти
# Для GPU с небольшим объемом видеопамяти
lerobot-train \
--policy.type diffusion \
--policy.pretrained_path lerobot/diffusion_policy \
--dataset.repo_id ${HF_USER}/your_dataset \
--batch_size 16 \
--steps 200000 \
--output_dir outputs/train/diffusion_policy_memory_opt \
--job_name diffusion_policy_memory_optimized \
--policy.device cuda \
--policy.horizon 16 \
--policy.n_action_steps 8 \
--policy.num_inference_steps 50 \
--policy.optimizer_lr 5e-5 \
--policy.use_amp true \
--num_workers 2 \
--policy.push_to_hub false \
--save_checkpoint true \
--wandb.enable true
Детальное описание параметров
Основные параметры
| Параметр | Значение | Рекомендуемое | Описание |
|---|---|---|---|
--policy.type | Тип политики | diffusion | Тип модели Diffusion Policy |
--policy.pretrained_path | Путь к модели | lerobot/diffusion_policy | Официальная модель LeRobot (опц.) |
--dataset.repo_id | ID репозитория | ${HF_USER}/your_dataset | Ваш набор данных на HuggingFace |
--batch_size | Размер батча | 64 | Регулируется под VRAM (32-64 для RTX 3080) |
--steps | Шаги обучения | 100000 | Диффузионные модели требуют много шагов |
--output_dir | Директория вывода | outputs/train/diffusion_policy_finetuned | Путь сохранения модели |
--job_name | Имя задачи | diffusion_policy_finetuning | Для логов и отслеживания (опционально) |
Специфические параметры Diffusion Policy
| Параметр | Значение | Рекомендуемое | Описание |
|---|---|---|---|
--policy.horizon | Горизонт прогноза | 16 | Длина предсказываемой последовательности |
--policy.n_action_steps | Шаги выполнения | 8 | Кол-во выполняемых действий за раз |
--policy.n_obs_steps | Шаги наблюдения | 2 | Количество исторических наблюдений |
--policy.num_inference_steps | Шаги инференса | 100 | Шаги диффузионной выборки (не при обучении) |
--policy.beta_schedule | График шума | squaredcos_cap_v2 | Стратегия добавления шума |
--policy.clip_sample | Обрезка образцов | true | Обрезать ли сгенерированные образцы |
--policy.clip_sample_range | Диапазон обрезки | 1.0 | Диапазон для clip_sample |
--policy.prediction_type | Тип предсказания | epsilon | Предсказание шума или образца |
--policy.num_train_timesteps | Шаги обучения | 100 | Шаги прямого процесса диффузии |
Параметры архитектуры сети
| Параметр | Значение | Рекомендуемое | Описание |
|---|---|---|---|
--policy.vision_backbone | Визуальный энкодер | resnet18 | Сеть извлечения признаков из изображений |
--policy.crop_shape | Размер обрезки | 84 84 | Размер обрезки входного изображения |
--policy.crop_is_random | Случайная обрезка | true | Использовать ли random crop при обучении |
--policy.use_group_norm | Групповая нормализация | true | Замена пакетной нормализации (Batch Norm) |
--policy.spatial_softmax_num_keypoints | Ключевые точки | 32 | Кол-во точек в слое Spatial Softmax |
--policy.down_dims | Размерности понижения | 512 1024 2048 | Размерности путей понижения в U-Net |
--policy.kernel_size | Размер ядра | 5 | Размер ядра для 1D-свертки |
--policy.n_groups | Группы в GroupNorm | 8 | Количество групп для GroupNorm |
--policy.diffusion_step_embed_dim | Размер эмбеддинга | 128 | Размерность эмбеддинга шага диффузии |
Параметры обучения
| Параметр | Значение | Рекомендуемое | Описание |
|---|---|---|---|
--policy.optimizer_lr | Скорость обучения | 1e-4 | Рекомендуемый LR для диффузионных моделей |
--policy.optimizer_weight_decay | Weight decay | 1e-6 | Параметр регуляризации |
--policy.optimizer_betas | Adam betas | 0.95 0.999 | Параметры бета для оптимизатора Adam |
--policy.optimizer_eps | Adam epsilon | 1e-8 | Параметр численной стабильности |
--policy.scheduler_name | Планировщик LR | cosine | Планировщик косинусного отжига |
--policy.scheduler_warmup_steps | Шаги прогрева | 500 | Шаги прогрева (warmup) скорости обучения |
--policy.use_amp | Смешанная точность | true | Экономия VRAM (Mixed Precision) |
--num_workers | Потоки данных | 4 | Зависит от количества ядер CPU |
--policy.push_to_hub | Публикация в Hub | false | Загрузка на HuggingFace (нужен repo_id) |
--save_checkpoint | Сохранение | true | Сохранение промежуточных чекпоинтов |
--save_freq | Частота сохранения | 10000 | Интервал сохранения чекпоинтов |
Мониторинг и отладка
Интеграция с Weights & Biases
# Детальная настройка W&B
lerobot-train \
--policy.type diffusion \
--dataset.repo_id your-name/your-dataset \
--batch_size 64 \
--steps 100000 \
--policy.push_to_hub false \
--wandb.enable true \
--wandb.project diffusion_policy_experiments \
--wandb.notes "Обучение Diffusion Policy с длинным горизонтом" \
# ... другие параметры
Мониторинг ключевых метрик
На какие метрики стоит обратить внимание:
- Diffusion Loss: общая ошибка диффузионной модели.
- MSE Loss: среднеквадратичная ошибка.
- Learning Rate: кривая изменения скорости обучения.
- Gradient Norm: норма градиента.
- Inference Time: время инференса.
- Sample Quality: качество сгенерированных образцов.
Визуализация процесса обучения
# visualization.py
import torch
import matplotlib.pyplot as plt
import numpy as np
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy
def visualize_diffusion_process(model_path, observation):
# Загрузка модели
policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
policy.eval()
# Процесс диффузии для генерации последовательности действий
with torch.no_grad():
# Начальный шум
noise = torch.randn(1, policy.horizon, policy.action_dim, device="cuda")
# Процесс диффузионной выборки
actions_sequence = []
for t in range(policy.num_inference_steps):
# Предсказание шума
noise_pred = policy.unet(noise, t, observation)
# Обновление образца
noise = policy.scheduler.step(noise_pred, t, noise).prev_sample
# Сохранение промежуточных результатов
if t % 10 == 0:
actions_sequence.append(noise.cpu().numpy())
final_actions = noise.cpu().numpy()
# Визуализация
fig, axes = plt.subplots(2, 3, figsize=(15, 10))
for i, actions in enumerate(actions_sequence[:6]):
ax = axes[i//3, i%3]
ax.plot(actions[0, :, 0], label='Action Dim 0')
ax.plot(actions[0, :, 1], label='Action Dim 1')
ax.set_title(f'Diffusion Step {i*10}')
ax.legend()
plt.tight_layout()
plt.savefig('diffusion_process.png')
plt.show()
return final_actions
if __name__ == "__main__":
model_path = "outputs/train/diffusion_policy_finetuned/checkpoints/last"
# Мок-наблюдение
observation = {
"observation.images.cam_high": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 7, device="cuda")
}
actions = visualize_diffusion_process(model_path, observation)
print(f"Форма сгенерированных действий: {actions.shape}")
Оценка модели
Офлайн-оценка
# offline_evaluation.py
import torch
import numpy as np
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy
from lerobot.datasets.lerobot_dataset import LeRobotDataset
def evaluate_diffusion_policy(model_path, dataset_path):
# Загрузка модели
policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
policy.eval()
# Загрузка тестового набора
dataset = LeRobotDataset(dataset_path, split="test")
total_mse_loss = 0
total_mae_loss = 0
num_samples = 0
with torch.no_grad():
for batch in dataset:
# Предсказание модели
prediction = policy(batch)
# Расчет ошибок
target_actions = batch['action']
predicted_actions = prediction['action']
mse_loss = torch.mean((predicted_actions - target_actions) ** 2)
mae_loss = torch.mean(torch.abs(predicted_actions - target_actions))
total_mse_loss += mse_loss.item()
total_mae_loss += mae_loss.item()
num_samples += 1
avg_mse_loss = total_mse_loss / num_samples
avg_mae_loss = total_mae_loss / num_samples
print(f"Средний MSE Loss: {avg_mse_loss:.4f}")
print(f"Средний MAE Loss: {avg_mae_loss:.4f}")
return avg_mse_loss, avg_mae_loss
def evaluate_action_diversity(model_path, observation, num_samples=10):
# Оценка разнообразия действий
policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
policy.eval()
actions_list = []
with torch.no_grad():
for _ in range(num_samples):
prediction = policy(observation)
actions_list.append(prediction['action'].cpu().numpy())
actions_array = np.array(actions_list) # [num_samples, horizon, action_dim]
# Расчет стандартного отклонения действий
action_std = np.std(actions_array, axis=0) # [horizon, action_dim]
avg_std = np.mean(action_std)
print(f"Среднее стандартное отклонение действий: {avg_std:.4f}")
return avg_std, actions_array
if __name__ == "__main__":
model_path = "outputs/train/diffusion_policy_finetuned/checkpoints/last"
dataset_path = "path/to/your/test/dataset"
# Офлайн-оценка
evaluate_diffusion_policy(model_path, dataset_path)
# Оценка разнообразия
observation = {
"observation.images.cam_high": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 7, device="cuda")
}
evaluate_action_diversity(model_path, observation)
Онлайн-оценка (в среде робота)
# robot_evaluation.py
import torch
import numpy as np
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy
class DiffusionPolicyController:
def __init__(self, model_path, num_inference_steps=50):
self.policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
self.policy.eval()
self.num_inference_steps = num_inference_steps
self.action_queue = []
self.current_obs_history = []
def get_action(self, observations):
# Обновление истории наблюдений
self.current_obs_history.append(observations)
if len(self.current_obs_history) > self.policy.n_obs_steps:
self.current_obs_history.pop(0)
# Если очередь пуста или нужно перепланировать, генерируем новую последовател ьность
if len(self.action_queue) == 0 or self.should_replan():
with torch.no_grad():
# Подготовка входных данных
batch = self.prepare_observation_batch()
# Установка шагов инференса
self.policy.scheduler.set_timesteps(self.num_inference_steps)
# Генерация последовательности действий
prediction = self.policy(batch)
actions = prediction['action'].cpu().numpy()[0] # [horizon, action_dim]
# Обновление очереди действий
self.action_queue = list(actions[:self.policy.n_action_steps])
# Возвращаем следующее действие
return self.action_queue.pop(0)
def should_replan(self):
# Перепланировка, когда в очереди осталось меньше половины действий
return len(self.action_queue) < self.policy.n_action_steps // 2
def prepare_observation_batch(self):
batch = {}
# Обработка изображений
if "observation.images.cam_high" in self.current_obs_history[-1]:
images = []
for obs in self.current_obs_history:
image = obs["observation.images.cam_high"]
image_tensor = self.preprocess_image(image)
images.append(image_tensor)
# Если истории не хватает, дополняем первым кадром
while len(images) < self.policy.n_obs_steps:
images.insert(0, images[0])
batch["observation.images.cam_high"] = torch.stack(images).unsqueeze(0)
# Обработка состояний
if "observation.state" in self.current_obs_history[-1]:
states = []
for obs in self.current_obs_history:
state = torch.tensor(obs["observation.state"], dtype=torch.float32)
states.append(state)
# Дополняем при нехватке
while len(states) < self.policy.n_obs_steps:
states.insert(0, states[0])
batch["observation.state"] = torch.stack(states).unsqueeze(0)
return batch
def preprocess_image(self, image):
# Логика предобработки изображения
image_tensor = torch.tensor(image).permute(2, 0, 1).float() / 255.0
return image_tensor
# Пример использования
if __name__ == "__main__":
controller = DiffusionPolicyController(
model_path="outputs/train/diffusion_policy_finetuned/checkpoints/last",
num_inference_steps=50
)
# Момуляция цикла управления
for step in range(100):
# Получение текущих наблюдений
observations = {
"observation.images.cam_high": np.random.randint(0, 255, (224, 224, 3)),
"observation.state": np.random.randn(7)
}
# Получение действия
action = controller.get_action(observations)
# Выполнение действия
print(f"Шаг {step}: Действие = {action}")
# Отправка реальному роботу
# robot.execute_action(action)
Развертывание и оптимизация
Ускорение инференса
# fast_inference.py
import torch
from lerobot.policies.diffusion.modeling_diffusion import DiffusionPolicy
from diffusers import DDIMScheduler
class FastDiffusionInference:
def __init__(self, model_path, num_inference_steps=10):
self.policy = DiffusionPolicy.from_pretrained(model_path, device="cuda")
self.policy.eval()
# Использование планировщика DDIM для быстрой выборки
self.policy.scheduler = DDIMScheduler.from_config(self.policy.scheduler.config)
self.num_inference_steps = num_inference_steps
# Прогрев модели
self.warmup()
def warmup(self):
dummy_batch = {
"observation.images.cam_high": torch.randn(1, 2, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 2, 7, device="cuda")
}
with torch.no_grad():
for _ in range(5):
_ = self.predict(dummy_batch)
@torch.no_grad()
def predict(self, observations):
# Установка шагов инференса
self.policy.scheduler.set_timesteps(self.num_inference_steps)
# Быстрый инференс
prediction = self.policy(observations)
return prediction['action'].cpu().numpy()
if __name__ == "__main__":
fast_inference = FastDiffusionInference(
"outputs/train/diffusion_policy_finetuned/checkpoints/last",
num_inference_steps=10
)
# Тест скорости
import time
observations = {
"observation.images.cam_high": torch.randn(1, 2, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 2, 7, device="cuda")
}
start_time = time.time()
for _ in range(100):
action = fast_inference.predict(observations)
end_time = time.time()
avg_inference_time = (end_time - start_time) / 100
print(f"Среднее время инференса: {avg_inference_time:.4f} сек")
print(f"Частота инференса: {1/avg_inference_time:.2f} Гц")
Лучшие практики
Советы по сбору данных
- Плавность действий: обеспечьте плавность и непрерывность действий в демонстрациях.
- Разнообразие сценариев: собирайте данные с разными начальными и целевыми состояниями.
- Качественная разметка: следите за точностью аннотаций действий.
- Объем данных: диффузионные модели обычно требуют больше данных, чем ACT.
Советы по оптимизации обучения
- Планировщик ш ума: выбирайте стратегию шума, подходящую вашей задаче.
- Шаги инференса: найдите баланс между скоростью и качеством.
- Планировщик LR: используйте косинусный отжиг или ступенчатое снижение.
- Регуляризация: используйте Weight Decay для предотвращения переобучения.
Советы по оптимизации развертывания
- Быстрая выборка: используйте DDIM или другие скоростные методы самплинга.
- Сжатие модели: используйте дистилляцию знаний или квантование.
- Параллельный инференс: используйте параллельные вычисления на GPU.
- Кэширование: кэшируйте промежуточные результаты вычислений.
Часто задаваемые вопросы (FAQ)
В: В чем преимущества Diffusion Policy перед другими методами?
О: Основные преимущества:
- Генерация мультимодальных распределений: решение задач с несколькими вариантами выполнения.
- Высокое качество: более плавные и естественные движения.
- Устойчивость: лучшая работа при зашумленных входных данных.
- Выразительность: способность обучаться очень сложным траекториям.