ACT (Action Chunking Transformer) 模型微调
概述
ACT(Action Chunking Transformer)是一种专为精细操作任务设计的端到端模仿学习模型。该模型通过预测动作块(action chunks)来克服传统模仿学习中的复合误 差问题,在低成本硬件上实现了高成功率的机器人操作。
核心特点
- 动作分块预测:一次预测多个连续动作,减少复合误差
- Transformer 架构:利用注意力机制处理序列数据
- 端到端训练:从原始观察直接预测动作
- 高成功率:在精细操作任务上表现优异
- 硬件友好:可在消费级硬件上运行
先决条件
系统要求
- 操作系统:Linux(推荐 Ubuntu 20.04+)或 macOS
- Python 版本:3.8+
- GPU:NVIDIA GPU(推荐 RTX 3070 或更高),至少 6GB 显存
- 内存:至少 16GB RAM
- 存储空间:至少 30GB 可用空间
环境准备
1. 安装 LeRobot
# 克隆 LeRobot 仓库
git clone https://github.com/huggingface/lerobot.git
cd lerobot
# 创建虚拟环境
conda create -n lerobot python=3.10
conda activate lerobot
# 安装依赖
pip install -e .
2. 安装 ACT 特定依赖
# 安装额外的依赖包
pip install einops
pip install timm
pip install wandb
# 登录 Weights & Biases(可选)
wandb login
ACT 模型架构
核心组件
- 视觉编码器:处理多视角图像输入
- 状态编码器:处理机器人状态信息
- Transformer 解码器:生成动作序列
- 动作头:输出最终的动作预测
关键参数
- Chunk Size:每次预测的动作数量(通常 50-100)
- Context Length:历史观察的长度
- Hidden Dimension:Transformer 的隐藏维度
- Number of Heads:注意力头的数量
- Number of Layers:Transformer 层数
数据准备
LeRobot 格式数据
ACT 需要使用 LeRobot 格式的数据集,包含以下结构:
your_dataset/
├── data/
│ ├── chunk-001/
│ │ ├── observation.images.cam_high.png
│ │ ├── observation.images.cam_low.png
│ │ ├── observation.images.cam_left_wrist.png
│ │ ├── observation.images.cam_right_wrist.png
│ │ ├── observation.state.npy
│ │ ├── action.npy
│ │ └── ...
│ └── chunk-002/
│ └── ...
├── meta.json
├── stats.safetensors
└── videos/
├── episode_000000.mp4
└── ...
数据质量要求
- 最少 50 个 episode 用于基本训练
- 推荐 200+ episode 以获得最佳效果
- 每个 episode 应包含完整的任务执行
- 多视角图像(至少 2 个摄像头)
- 高质量的动作标注
微调训练
基本训练命令
# 设置环境变量
export HF_USER="your-huggingface-username"
export CUDA_VISIBLE_DEVICES=0
# 启动 ACT 训练
lerobot-train \
--policy.path=lerobot/act \
--dataset.repo_id=${HF_USER}/your_dataset \
--batch_size=8 \
--steps=50000 \
--output_dir=outputs/train/act_finetuned \
--job_name=act_finetuning \
--policy.device=cuda \
--policy.chunk_size=50 \
--policy.n_obs_steps=1 \
--policy.camera_names="[cam_high,cam_low,cam_left_wrist,cam_right_wrist]" \
--training.learning_rate=1e-5 \
--training.weight_decay=1e-4 \
--wandb.enable=true
高级训练配置
多摄像头配置
# 针对多摄像头设置的 ACT 训练
lerobot-train \
--policy.path=lerobot/act \
--dataset.repo_id=${HF_USER}/your_dataset \
--batch_size=4 \
--steps=100000 \
--output_dir=outputs/train/act_multicam \
--job_name=act_multicam_training \
--policy.device=cuda \
--policy.chunk_size=100 \
--policy.n_obs_steps=2 \
--policy.camera_names="[cam_high,cam_low,cam_left_wrist,cam_right_wrist]" \
--policy.vision_backbone=resnet18 \
--policy.hidden_dim=512 \
--policy.dim_feedforward=3200 \
--policy.n_layer=8 \
--policy.n_head=8 \
--training.learning_rate=1e-5 \
--training.weight_decay=1e-4 \
--training.lr_scheduler=cosine \
--wandb.enable=true
内存优化配置
# 针对显存较小的 GPU
lerobot-train \
--policy.path=lerobot/act \
--dataset.repo_id=io-ai-data/lerobot_data \
--batch_size=2 \
--steps=75000 \
--output_dir=outputs/train/act_memory_opt \
--job_name=act_memory_optimized \
--policy.device=cuda \
--policy.chunk_size=50 \
--policy.camera_names="[cam_high,cam_low]" \
--policy.vision_backbone=resnet18 \
--policy.hidden_dim=256 \
--training.learning_rate=1e-5 \
--training.gradient_accumulation_steps=4 \
--training.mixed_precision=fp16 \
--wandb.enable=true
参数详细说明
核心参数
参数 | 含义 | 推荐值 | 说明 |
---|---|---|---|
--policy.path | 预训练模型路径 | lerobot/act | LeRobot 官方 ACT 模型 |
--dataset.repo_id | 数据集仓库 ID | ${HF_USER}/dataset | 你的 HuggingFace 数据集 |
--batch_size | 批处理大小 | 8 | 根据显存调整,RTX 3070 推荐 4-8 |
--steps | 训练步数 | 50000 | 精细任务推荐 50000-100000 步 |
--output_dir | 输出目录 | outputs/train/act_finetuned | 模型保存路径 |
--job_name | 任务名称 | act_finetuning | 用于日志和实验跟踪 |
ACT 特定参数
参数 | 含义 | 推荐值 | 说明 |
---|---|---|---|
--policy.chunk_size | 动作块大小 | 50 | 每次预测的动作数量 |
--policy.n_obs_steps | 观察步数 | 1 | 历史观察的数量 |
--policy.camera_names | 摄像头名称 | [cam_high,cam_low] | 使用的摄像头列表 |
--policy.vision_backbone | 视觉骨干网络 | resnet18 | 图像特征提取网络 |
--policy.hidden_dim | 隐藏维度 | 512 | Transformer 隐藏层维度 |
--policy.dim_feedforward | 前馈网络维度 | 3200 | Transformer 前馈层维度 |
--policy.n_layer | Transformer 层数 | 8 | 编码器/解码器层数 |
--policy.n_head | 注意力头数 | 8 | 多头注意力的头数 |
训练参数
参数 | 含义 | 推荐值 | 说明 |
---|---|---|---|
--training.learning_rate | 学习率 | 1e-5 | ACT 推荐较小的学习率 |
--training.weight_decay | 权重衰减 | 1e-4 | 正则化参数 |
--training.lr_scheduler | 学习率调度器 | cosine | 余弦退火调度 |
--training.warmup_steps | 预热步数 | 1000 | 学习率预热 |
--training.gradient_accumulation_steps | 梯度累积 | 1 | 显存不足时增加 |
--training.mixed_precision | 混合精度 | fp16 | 节省显存 |
训练监控和调试
Weights & Biases 集成
# 详细的 W&B 配置
lerobot-train \
--wandb.enable=true \
--wandb.project=act_experiments \
--wandb.run_name=act_v1_multicam \
--wandb.notes="ACT training with 4 cameras" \
--wandb.tags="[act,multicam,finetuning]" \
# ... 其他参数
关键指标监控
训练过程中需要关注的指标:
- Total Loss:总体损失,应该稳定下降
- Action Loss:动作预测损失(L1/L2 损失)
- Learning Rate:学习率变化曲线
- Gradient Norm:梯度范数,监控梯度爆炸
- GPU Memory:显存使用情况
- Training Speed:每秒处理的样本数
训练日志分析
# log_analysis.py
import wandb
import matplotlib.pyplot as plt
def analyze_training_logs(project_name, run_name):
api = wandb.Api()
run = api.run(f"{project_name}/{run_name}")
# 获取训练指标
history = run.history()
# 绘制损失曲线
plt.figure(figsize=(12, 4))
plt.subplot(1, 3, 1)
plt.plot(history['step'], history['train/total_loss'])
plt.title('Total Loss')
plt.xlabel('Step')
plt.ylabel('Loss')
plt.subplot(1, 3, 2)
plt.plot(history['step'], history['train/action_loss'])
plt.title('Action Loss')
plt.xlabel('Step')
plt.ylabel('Loss')
plt.subplot(1, 3, 3)
plt.plot(history['step'], history['train/learning_rate'])
plt.title('Learning Rate')
plt.xlabel('Step')
plt.ylabel('LR')
plt.tight_layout()
plt.savefig('training_analysis.png')
plt.show()
if __name__ == "__main__":
analyze_training_logs("act_experiments", "act_v1_multicam")
模型评估
离线评估
# offline_evaluation.py
import torch
import numpy as np
from lerobot.common.policies.act.modeling_act import ACTPolicy
from lerobot.common.datasets.lerobot_dataset import LeRobotDataset
def evaluate_act_model(model_path, dataset_path):
# 加载模型
policy = ACTPolicy.from_pretrained(model_path, device="cuda")
policy.eval()
# 加载测试数据集
dataset = LeRobotDataset(dataset_path, split="test")
total_l1_loss = 0
total_l2_loss = 0
num_samples = 0
with torch.no_grad():
for batch in dataset:
# 模型预测
prediction = policy(batch)
# 计算损失
target_actions = batch['action']
predicted_actions = prediction['action']
l1_loss = torch.mean(torch.abs(predicted_actions - target_actions))
l2_loss = torch.mean((predicted_actions - target_actions) ** 2)
total_l1_loss += l1_loss.item()
total_l2_loss += l2_loss.item()
num_samples += 1
avg_l1_loss = total_l1_loss / num_samples
avg_l2_loss = total_l2_loss / num_samples
print(f"Average L1 Loss: {avg_l1_loss:.4f}")
print(f"Average L2 Loss: {avg_l2_loss:.4f}")
return avg_l1_loss, avg_l2_loss
if __name__ == "__main__":
model_path = "outputs/train/act_finetuned/checkpoints/last"
dataset_path = "path/to/your/test/dataset"
evaluate_act_model(model_path, dataset_path)
在线评估(机器人环境)
# robot_evaluation.py
import torch
import numpy as np
from lerobot.common.policies.act.modeling_act import ACTPolicy
class ACTRobotController:
def __init__(self, model_path, camera_names):
self.policy = ACTPolicy.from_pretrained(model_path, device="cuda")
self.policy.eval()
self.camera_names = camera_names
self.action_queue = []
def get_action(self, observations):
# 如果动作队列为空,预测新的动作块
if len(self.action_queue) == 0:
with torch.no_grad():
# 构建输入
batch = self.prepare_observation(observations)
# 预测动作块
prediction = self.policy(batch)
actions = prediction['action'].cpu().numpy()[0] # [chunk_size, action_dim]
# 将动作添加到队列
self.action_queue = list(actions)
# 返回队列中的下一个动作
return self.action_queue.pop(0)
def prepare_observation(self, observations):
batch = {}
# 处理图像观察
for cam_name in self.camera_names:
image_key = f"observation.images.{cam_name}"
if image_key in observations:
image = observations[image_key]
# 预处理图像(归一化、调整大小等)
image_tensor = self.preprocess_image(image)
batch[image_key] = image_tensor.unsqueeze(0)
# 处理状态观察
if "observation.state" in observations:
state = torch.tensor(observations["observation.state"], dtype=torch.float32)
batch["observation.state"] = state.unsqueeze(0)
return batch
def preprocess_image(self, image):
# 图像预处理逻辑
# 这里需要根据训练时的预处理方式进行相同的处理
image_tensor = torch.tensor(image).permute(2, 0, 1).float() / 255.0
return image_tensor
# 使用示例
if __name__ == "__main__":
controller = ACTRobotController(
model_path="outputs/train/act_finetuned/checkpoints/last",
camera_names=["cam_high", "cam_low", "cam_left_wrist", "cam_right_wrist"]
)
# 模拟机器人控制循环
for step in range(100):
# 获取当前观察(这里需要从实际机器人获取)
observations = {
"observation.images.cam_high": np.random.randint(0, 255, (480, 640, 3)),
"observation.images.cam_low": np.random.randint(0, 255, (480, 640, 3)),
"observation.state": np.random.randn(7)
}
# 获取动作
action = controller.get_action(observations)
# 执行动作(发送到机器人)
print(f"Step {step}: Action = {action}")
# 这里应该将动作发送给实际的机器人
# robot.execute_action(action)
部署和优化
模型量化
# quantization.py
import torch
from lerobot.common.policies.act.modeling_act import ACTPolicy
def quantize_act_model(model_path, output_path):
# 加载模型
policy = ACTPolicy.from_pretrained(model_path, device="cpu")
policy.eval()
# 动态量化
quantized_policy = torch.quantization.quantize_dynamic(
policy,
{torch.nn.Linear},
dtype=torch.qint8
)
# 保存量化模型
torch.save(quantized_policy.state_dict(), output_path)
print(f"Quantized model saved to {output_path}")
return quantized_policy
if __name__ == "__main__":
quantize_act_model(
"outputs/train/act_finetuned/checkpoints/last",
"outputs/act_quantized.pth"
)
推理优化
# optimized_inference.py
import torch
import torch.jit
from lerobot.common.policies.act.modeling_act import ACTPolicy
class OptimizedACTInference:
def __init__(self, model_path, use_jit=True):
self.policy = ACTPolicy.from_pretrained(model_path, device="cuda")
self.policy.eval()
if use_jit:
# 使用 TorchScript 优化
self.policy = torch.jit.script(self.policy)
# 预热模型
self.warmup()
def warmup(self):
# 使用虚拟数据预热模型
dummy_batch = {
"observation.images.cam_high": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.images.cam_low": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 7, device="cuda")
}
with torch.no_grad():
for _ in range(10):
_ = self.policy(dummy_batch)
@torch.no_grad()
def predict(self, observations):
# 快速推理
prediction = self.policy(observations)
return prediction['action'].cpu().numpy()
if __name__ == "__main__":
inference = OptimizedACTInference(
"outputs/train/act_finetuned/checkpoints/last"
)
# 测试推理速度
import time
observations = {
"observation.images.cam_high": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.images.cam_low": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 7, device="cuda")
}
start_time = time.time()
for _ in range(100):
action = inference.predict(observations)
end_time = time.time()
avg_inference_time = (end_time - start_time) / 100
print(f"Average inference time: {avg_inference_time:.4f} seconds")
print(f"Inference frequency: {1/avg_inference_time:.2f} Hz")
最佳实践
数据收集建议
- 多视角数据:使用多个摄像头获取丰富的视觉信息
- 高质量演示:确保演示数据的一致性和准确性
- 任务多样性:包含不同的起始状态和目标配置
- 失败案例:适当包含失败案例以提高鲁棒性
训练优化建议
- 动作块大小:根据任务复杂度调整 chunk_size
- 学习率调度:使用余弦退火或阶梯式衰减
- 正则化:适当使用权重衰减和 dropout
- 数据增强:对图像进行适当的增强处理
部署优化建议
- 模型压缩:使用量化和剪枝技术减小模型大小
- 推理加速:使用 TensorRT 或 ONNX 进行推理优化
- 内存管理:合理管理动作队列和观察缓存
- 实时性保证:确保推理频率满足控制要求
常见问题 (FAQ)
Q: ACT 与其他模仿学习方法相比有什么优势?
A: ACT 的主要优势包括:
- 减少复合误差:通过预测动作块减少误差累积
- 提高成功率:在精细操作任务上表现优异
- 端到端训练:无需手工设计特征
- 多模态融合:有效融合视觉和状态信息
Q: 如何选择合适的 chunk_size?
A: chunk_size 的选择取决于任务特性:
- 快速任务:chunk_size = 10-30
- 中等任务:chunk_size = 50-100
- 慢速任务:chunk_size = 100-200
- 一般建议从 50 开始尝试
Q: 训练需要多长时间?
A: 训练时间取决于多个因素:
- 数据集大小:100 episodes 约需 4-8 小时(RTX 3070)
- 模型复杂度:更大的模型需要更长时间
- 硬件配置:更好的 GPU 可显著减少训练时间
- 收敛要求:通常需要 50000-100000 步
Q: 如何处理多摄像头数据?
A: 多摄像头处理建议:
- 摄像头选择:选择信息互补的视角
- 特征融合:在特征层面进行融合
- 注意力机制:让模型学习关注重要视角
- 计算资源:注意多摄像头会增加计算负担
Q: 如何提升模型的泛化能力?
A: 提升泛化能力的方法:
- 数据多样性:收集不同条件下的数据
- 数据增强:使用图像和动作增强技术
- 正则化:适当的权重衰减和 dropout
- 域随机化:在仿真中使用域随机化技术
- 多任务学习:在多个相关任务上联合训练
相关资源
更新日志
- 2024-01: 初始版本发布
- 2024-02: 添加多摄像头支持
- 2024-03: 优化训练效率和推理速度
- 2024-04: 添加模型压缩和部署优化