Перейти к основному содержимому

ACT (Action Chunking Transformer) 模型微调

概述

ACT(Action Chunking Transformer)是一种专为精细操作任务设计的端到端模仿学习模型。该模型通过预测动作块(action chunks)来克服传统模仿学习中的复合误差问题,在低成本硬件上实现了高成功率的机器人操作。

核心特点

  • 动作分块预测:一次预测多个连续动作,减少复合误差
  • Transformer 架构:利用注意力机制处理序列数据
  • 端到端训练:从原始观察直接预测动作
  • 高成功率:在精细操作任务上表现优异
  • 硬件友好:可在消费级硬件上运行

先决条件

系统要求

  • 操作系统:Linux(推荐 Ubuntu 20.04+)或 macOS
  • Python 版本:3.8+
  • GPU:NVIDIA GPU(推荐 RTX 3070 或更高),至少 6GB 显存
  • 内存:至少 16GB RAM
  • 存储空间:至少 30GB 可用空间

环境准备

1. 安装 LeRobot

# 克隆 LeRobot 仓库
git clone https://github.com/huggingface/lerobot.git
cd lerobot

# 创建虚拟环境
conda create -n lerobot python=3.10
conda activate lerobot

# 安装依赖
pip install -e .

2. 安装 ACT 特定依赖

# 安装额外的依赖包
pip install einops
pip install timm
pip install wandb

# 登录 Weights & Biases(可选)
wandb login

ACT 模型架构

核心组件

  1. 视觉编码器:处理多视角图像输入
  2. 状态编码器:处理机器人状态信息
  3. Transformer 解码器:生成动作序列
  4. 动作头:输出最终的动作预测

关键参数

  • Chunk Size:每次预测的动作数量(通常 50-100)
  • Context Length:历史观察的长度
  • Hidden Dimension:Transformer 的隐藏维度
  • Number of Heads:注意力头的数量
  • Number of Layers:Transformer 层数

数据准备

LeRobot 格式数据

ACT 需要使用 LeRobot 格式的数据集,包含以下结构:

your_dataset/
├── data/
│ ├── chunk-001/
│ │ ├── observation.images.cam_high.png
│ │ ├── observation.images.cam_low.png
│ │ ├── observation.images.cam_left_wrist.png
│ │ ├── observation.images.cam_right_wrist.png
│ │ ├── observation.state.npy
│ │ ├── action.npy
│ │ └── ...
│ └── chunk-002/
│ └── ...
├── meta.json
├── stats.safetensors
└── videos/
├── episode_000000.mp4
└── ...

数据质量要求

  • 最少 50 个 episode 用于基本训练
  • 推荐 200+ episode 以获得最佳效果
  • 每个 episode 应包含完整的任务执行
  • 多视角图像(至少 2 个摄像头)
  • 高质量的动作标注

微调训练

基本训练命令

# 设置环境变量
export HF_USER="your-huggingface-username"
export CUDA_VISIBLE_DEVICES=0

# 启动 ACT 训练
lerobot-train \
--policy.path=lerobot/act \
--dataset.repo_id=${HF_USER}/your_dataset \
--batch_size=8 \
--steps=50000 \
--output_dir=outputs/train/act_finetuned \
--job_name=act_finetuning \
--policy.device=cuda \
--policy.chunk_size=50 \
--policy.n_obs_steps=1 \
--policy.camera_names="[cam_high,cam_low,cam_left_wrist,cam_right_wrist]" \
--training.learning_rate=1e-5 \
--training.weight_decay=1e-4 \
--wandb.enable=true

高级训练配置

多摄像头配置

# 针对多摄像头设置的 ACT 训练
lerobot-train \
--policy.path=lerobot/act \
--dataset.repo_id=${HF_USER}/your_dataset \
--batch_size=4 \
--steps=100000 \
--output_dir=outputs/train/act_multicam \
--job_name=act_multicam_training \
--policy.device=cuda \
--policy.chunk_size=100 \
--policy.n_obs_steps=2 \
--policy.camera_names="[cam_high,cam_low,cam_left_wrist,cam_right_wrist]" \
--policy.vision_backbone=resnet18 \
--policy.hidden_dim=512 \
--policy.dim_feedforward=3200 \
--policy.n_layer=8 \
--policy.n_head=8 \
--training.learning_rate=1e-5 \
--training.weight_decay=1e-4 \
--training.lr_scheduler=cosine \
--wandb.enable=true

内存优化配置

# 针对显存较小的 GPU
lerobot-train \
--policy.path=lerobot/act \

--dataset.repo_id=io-ai-data/lerobot_data \
--batch_size=2 \
--steps=75000 \
--output_dir=outputs/train/act_memory_opt \
--job_name=act_memory_optimized \
--policy.device=cuda \
--policy.chunk_size=50 \
--policy.camera_names="[cam_high,cam_low]" \
--policy.vision_backbone=resnet18 \
--policy.hidden_dim=256 \
--training.learning_rate=1e-5 \
--training.gradient_accumulation_steps=4 \
--training.mixed_precision=fp16 \
--wandb.enable=true

参数详细说明

核心参数

参数含义推荐值说明
--policy.path预训练模型路径lerobot/actLeRobot 官方 ACT 模型
--dataset.repo_id数据集仓库 ID${HF_USER}/dataset你的 HuggingFace 数据集
--batch_size批处理大小8根据显存调整,RTX 3070 推荐 4-8
--steps训练步数50000精细任务推荐 50000-100000 步
--output_dir输出目录outputs/train/act_finetuned模型保存路径
--job_name任务名称act_finetuning用于日志和实验跟踪

ACT 特定参数

参数含义推荐值说明
--policy.chunk_size动作块大小50每次预测的动作数量
--policy.n_obs_steps观察步数1历史观察的数量
--policy.camera_names摄像头名称[cam_high,cam_low]使用的摄像头列表
--policy.vision_backbone视觉骨干网络resnet18图像特征提取网络
--policy.hidden_dim隐藏维度512Transformer 隐藏层维度
--policy.dim_feedforward前馈网络维度3200Transformer 前馈层维度
--policy.n_layerTransformer 层数8编码器/解码器层数
--policy.n_head注意力头数8多头注意力的头数

训练参数

参数含义推荐值说明
--training.learning_rate学习率1e-5ACT 推荐较小的学习率
--training.weight_decay权重衰减1e-4正则化参数
--training.lr_scheduler学习率调度器cosine余弦退火调度
--training.warmup_steps预热步数1000学习率预热
--training.gradient_accumulation_steps梯度累积1显存不足时增加
--training.mixed_precision混合精度fp16节省显存

训练监控和调试

Weights & Biases 集成

# 详细的 W&B 配置
lerobot-train \
--wandb.enable=true \
--wandb.project=act_experiments \
--wandb.run_name=act_v1_multicam \
--wandb.notes="ACT training with 4 cameras" \
--wandb.tags="[act,multicam,finetuning]" \
# ... 其他参数

关键指标监控

训练过程中需要关注的指标:

  • Total Loss:总体损失,应该稳定下降
  • Action Loss:动作预测损失(L1/L2 损失)
  • Learning Rate:学习率变化曲线
  • Gradient Norm:梯度范数,监控梯度爆炸
  • GPU Memory:显存使用情况
  • Training Speed:每秒处理的样本数

训练日志分析

# log_analysis.py
import wandb
import matplotlib.pyplot as plt

def analyze_training_logs(project_name, run_name):
api = wandb.Api()
run = api.run(f"{project_name}/{run_name}")

# 获取训练指标
history = run.history()

# 绘制损失曲线
plt.figure(figsize=(12, 4))

plt.subplot(1, 3, 1)
plt.plot(history['step'], history['train/total_loss'])
plt.title('Total Loss')
plt.xlabel('Step')
plt.ylabel('Loss')

plt.subplot(1, 3, 2)
plt.plot(history['step'], history['train/action_loss'])
plt.title('Action Loss')
plt.xlabel('Step')
plt.ylabel('Loss')

plt.subplot(1, 3, 3)
plt.plot(history['step'], history['train/learning_rate'])
plt.title('Learning Rate')
plt.xlabel('Step')
plt.ylabel('LR')

plt.tight_layout()
plt.savefig('training_analysis.png')
plt.show()

if __name__ == "__main__":
analyze_training_logs("act_experiments", "act_v1_multicam")

模型评估

离线评估

# offline_evaluation.py
import torch
import numpy as np
from lerobot.common.policies.act.modeling_act import ACTPolicy
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset

def evaluate_act_model(model_path, dataset_path):
# 加载模型
policy = ACTPolicy.from_pretrained(model_path, device="cuda")
policy.eval()

# 加载测试数据集
dataset = LeRobotDataset(dataset_path, split="test")

total_l1_loss = 0
total_l2_loss = 0
num_samples = 0

with torch.no_grad():
for batch in dataset:
# 模型预测
prediction = policy(batch)

# 计算损失
target_actions = batch['action']
predicted_actions = prediction['action']

l1_loss = torch.mean(torch.abs(predicted_actions - target_actions))
l2_loss = torch.mean((predicted_actions - target_actions) ** 2)

total_l1_loss += l1_loss.item()
total_l2_loss += l2_loss.item()
num_samples += 1

avg_l1_loss = total_l1_loss / num_samples
avg_l2_loss = total_l2_loss / num_samples

print(f"Average L1 Loss: {avg_l1_loss:.4f}")
print(f"Average L2 Loss: {avg_l2_loss:.4f}")

return avg_l1_loss, avg_l2_loss

if __name__ == "__main__":
model_path = "outputs/train/act_finetuned/checkpoints/last"
dataset_path = "path/to/your/test/dataset"
evaluate_act_model(model_path, dataset_path)

在线评估(机器人环境)

# robot_evaluation.py
import torch
import numpy as np
from lerobot.common.policies.act.modeling_act import ACTPolicy

class ACTRobotController:
def __init__(self, model_path, camera_names):
self.policy = ACTPolicy.from_pretrained(model_path, device="cuda")
self.policy.eval()
self.camera_names = camera_names
self.action_queue = []

def get_action(self, observations):
# 如果动作队列为空,预测新的动作块
if len(self.action_queue) == 0:
with torch.no_grad():
# 构建输入
batch = self.prepare_observation(observations)

# 预测动作块
prediction = self.policy(batch)
actions = prediction['action'].cpu().numpy()[0] # [chunk_size, action_dim]

# 将动作添加到队列
self.action_queue = list(actions)

# 返回队列中的下一个动作
return self.action_queue.pop(0)

def prepare_observation(self, observations):
batch = {}

# 处理图像观察
for cam_name in self.camera_names:
image_key = f"observation.images.{cam_name}"
if image_key in observations:
image = observations[image_key]
# 预处理图像(归一化、调整大小等)
image_tensor = self.preprocess_image(image)
batch[image_key] = image_tensor.unsqueeze(0)

# 处理状态观察
if "observation.state" in observations:
state = torch.tensor(observations["observation.state"], dtype=torch.float32)
batch["observation.state"] = state.unsqueeze(0)

return batch

def preprocess_image(self, image):
# 图像预处理逻辑
# 这里需要根据训练时的预处理方式进行相同的处理
image_tensor = torch.tensor(image).permute(2, 0, 1).float() / 255.0
return image_tensor

# 使用示例
if __name__ == "__main__":
controller = ACTRobotController(
model_path="outputs/train/act_finetuned/checkpoints/last",
camera_names=["cam_high", "cam_low", "cam_left_wrist", "cam_right_wrist"]
)

# 模拟机器人控制循环
for step in range(100):
# 获取当前观察(这里需要从实际机器人获取)
observations = {
"observation.images.cam_high": np.random.randint(0, 255, (480, 640, 3)),
"observation.images.cam_low": np.random.randint(0, 255, (480, 640, 3)),
"observation.state": np.random.randn(7)
}

# 获取动作
action = controller.get_action(observations)

# 执行动作(发送到机器人)
print(f"Step {step}: Action = {action}")

# 这里应该将动作发送给实际的机器人
# robot.execute_action(action)

部署和优化

模型量化

# quantization.py
import torch
from lerobot.common.policies.act.modeling_act import ACTPolicy

def quantize_act_model(model_path, output_path):
# 加载模型
policy = ACTPolicy.from_pretrained(model_path, device="cpu")
policy.eval()

# 动态量化
quantized_policy = torch.quantization.quantize_dynamic(
policy,
{torch.nn.Linear},
dtype=torch.qint8
)

# 保存量化模型
torch.save(quantized_policy.state_dict(), output_path)
print(f"Quantized model saved to {output_path}")

return quantized_policy

if __name__ == "__main__":
quantize_act_model(
"outputs/train/act_finetuned/checkpoints/last",
"outputs/act_quantized.pth"
)

推理优化

# optimized_inference.py
import torch
import torch.jit
from lerobot.common.policies.act.modeling_act import ACTPolicy

class OptimizedACTInference:
def __init__(self, model_path, use_jit=True):
self.policy = ACTPolicy.from_pretrained(model_path, device="cuda")
self.policy.eval()

if use_jit:
# 使用 TorchScript 优化
self.policy = torch.jit.script(self.policy)

# 预热模型
self.warmup()

def warmup(self):
# 使用虚拟数据预热模型
dummy_batch = {
"observation.images.cam_high": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.images.cam_low": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 7, device="cuda")
}

with torch.no_grad():
for _ in range(10):
_ = self.policy(dummy_batch)

@torch.no_grad()
def predict(self, observations):
# 快速推理
prediction = self.policy(observations)
return prediction['action'].cpu().numpy()

if __name__ == "__main__":
inference = OptimizedACTInference(
"outputs/train/act_finetuned/checkpoints/last"
)

# 测试推理速度
import time

observations = {
"observation.images.cam_high": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.images.cam_low": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 7, device="cuda")
}

start_time = time.time()
for _ in range(100):
action = inference.predict(observations)
end_time = time.time()

avg_inference_time = (end_time - start_time) / 100
print(f"Average inference time: {avg_inference_time:.4f} seconds")
print(f"Inference frequency: {1/avg_inference_time:.2f} Hz")

最佳实践

数据收集建议

  1. 多视角数据:使用多个摄像头获取丰富的视觉信息
  2. 高质量演示:确保演示数据的一致性和准确性
  3. 任务多样性:包含不同的起始状态和目标配置
  4. 失败案例:适当包含失败案例以提高鲁棒性

训练优化建议

  1. 动作块大小:根据任务复杂度调整 chunk_size
  2. 学习率调度:使用余弦退火或阶梯式衰减
  3. 正则化:适当使用权重衰减和 dropout
  4. 数据增强:对图像进行适当的增强处理

部署优化建议

  1. 模型压缩:使用量化和剪枝技术减小模型大小
  2. 推理加速:使用 TensorRT 或 ONNX 进行推理优化
  3. 内存管理:合理管理动作队列和观察缓存
  4. 实时性保证:确保推理频率满足控制要求

常见问题 (FAQ)

Q: ACT 与其他模仿学习方法相比有什么优势?

A: ACT 的主要优势包括:

  • 减少复合误差:通过预测动作块减少误差累积
  • 提高成功率:在精细操作任务上表现优异
  • 端到端训练:无需手工设计特征
  • 多模态融合:有效融合视觉和状态信息

Q: 如何选择合适的 chunk_size?

A: chunk_size 的选择取决于任务特性:

  • 快速任务:chunk_size = 10-30
  • 中等任务:chunk_size = 50-100
  • 慢速任务:chunk_size = 100-200
  • 一般建议从 50 开始尝试

Q: 训练需要多长时间?

A: 训练时间取决于多个因素:

  • 数据集大小:100 episodes 约需 4-8 小时(RTX 3070)
  • 模型复杂度:更大的模型需要更长时间
  • 硬件配置:更好的 GPU 可显著减少训练时间
  • 收敛要求:通常需要 50000-100000 步

Q: 如何处理多摄像头数据?

A: 多摄像头处理建议:

  • 摄像头选择:选择信息互补的视角
  • 特征融合:在特征层面进行融合
  • 注意力机制:让模型学习关注重要视角
  • 计算资源:注意多摄像头会增加计算负担

Q: 如何提升模型的泛化能力?

A: 提升泛化能力的方法:

  • 数据多样性:收集不同条件下的数据
  • 数据增强:使用图像和动作增强技术
  • 正则化:适当的权重衰减和 dropout
  • 域随机化:在仿真中使用域随机化技术
  • 多任务学习:在多个相关任务上联合训练

相关资源

更新日志

  • 2024-01: 初始版本发布
  • 2024-02: 添加多摄像头支持
  • 2024-03: 优化训练效率和推理速度
  • 2024-04: 添加模型压缩和部署优化