ACT (Action Chunking Transformer) 模型微调
概述
ACT(Action Chunking Transformer)是一种专为精细操作任务设计的端到端模仿学习模型。该模型通过预测动作块(action chunks)来克服传统模仿学习中的复合误差问题,在低成本硬件上实现了高成功率的机器人操作。
核心特点
- 动作分块预测:一次预测多个连续动作,减少复合误差
- Transformer 架构:利用注意力机制处理序列数据
- 端到端训练:从 原始观察直接预测动作
- 高成功率:在精细操作任务上表现优异
- 硬件友好:可在消费级硬件上运行
先决条件
系统要求
- 操作系统:Linux(推荐 Ubuntu 20.04+)或 macOS
- Python 版本:3.8+
- GPU:NVIDIA GPU(推荐 RTX 3070 或更高),至少 6GB 显存
- 内存:至少 16GB RAM
- 存储空间:至少 30GB 可用空间
环境准备
1. 安装 LeRobot
# 克隆 LeRobot 仓库
git clone https://github.com/huggingface/lerobot.git
cd lerobot
# 创建虚拟环境
conda create -n lerobot python=3.10
conda activate lerobot
# 安装依赖
pip install -e .
2. 安装 ACT 特定依赖
# 安装额外的依赖包
pip install einops
pip install timm
pip install wandb
# 登录 Weights & Biases(可选)
wandb login
ACT 模型架构
核心组件
- 视觉编码器:处理多视角图像输入
- 状态编码器:处理机器人状态信息
- Transformer 解码器:生成动作序列
- 动作头:输出最终的动作预测
关键参数
- Chunk Size:每次预测的动作数量(通常 50-100)
- Context Length:历史观察的长度
- Hidden Dimension:Transformer 的隐藏维度
- Number of Heads:注意力头的数量
- Number of Layers:Transformer 层数
数据准备
LeRobot 格式数据
ACT 需要使用 LeRobot 格式的数据集,包含以下结构:
your_dataset/
├── data/
│ ├── chunk-001/
│ │ ├── observation.images.cam_high.png
│ │ ├── observation.images.cam_low.png
│ │ ├── observation.images.cam_left_wrist.png
│ │ ├── observation.images.cam_right_wrist.png
│ │ ├── observation.state.npy
│ │ ├── action.npy
│ │ └── ...
│ └── chunk-002/
│ └── ...
├── meta.json
├── stats.safetensors
└── videos/
├── episode_000000.mp4
└── ...
数据质量要求
- 最少 50 个 episode 用于基本训练
- 推荐 200+ episode 以获得最佳效果
- 每个 episode 应包含完整的任务执行
- 多视角图像(至少 2 个摄像头)
- 高质量的动作标注
微调训练
重要参数约束
ACT 模型的 n_action_steps
必须 ≤ chunk_size
。推荐两者设置为相同值 (如都设为 100)。
chunk_size
:模型一次预测的动作序列长度n_action_steps
:实际执行的动作步数
基本训练命令
# 设置环境变量
export HF_USER="your-huggingface-username"
export CUDA_VISIBLE_DEVICES=0
# 启动 ACT 训练
lerobot-train \
--policy.type act \
--dataset.repo_id ${HF_USER}/your_dataset \
--batch_size 8 \
--steps 50000 \
--output_dir outputs/train/act_finetuned \
--job_name act_finetuning \
--policy.device cuda \
--policy.chunk_size 100 \
--policy.n_action_steps 100 \
--policy.n_obs_steps 1 \
--policy.optimizer_lr 1e-5 \
--policy.optimizer_weight_decay 1e-4 \
--policy.push_to_hub false \
--save_checkpoint true \
--save_freq 10000 \
--wandb.enable true
高级训练配置
多摄像头配置
# 针对多摄像头设置的 ACT 训练
lerobot-train \
--policy.type act \
--dataset.repo_id ${HF_USER}/your_dataset \
--batch_size 4 \
--steps 100000 \
--output_dir outputs/train/act_multicam \
--job_name act_multicam_training \
--policy.device cuda \
--policy.chunk_size 100 \
--policy.n_action_steps 100 \
--policy.n_obs_steps 2 \
--policy.vision_backbone resnet18 \
--policy.dim_model 512 \
--policy.dim_feedforward 3200 \
--policy.n_encoder_layers 4 \
--policy.n_decoder_layers 1 \
--policy.n_heads 8 \
--policy.optimizer_lr 1e-5 \
--policy.optimizer_weight_decay 1e-4 \
--policy.push_to_hub false \
--save_checkpoint true \
--wandb.enable true
内存优化配置
# 针对显存较小的 GPU
lerobot-train \
--policy.type act \
--dataset.repo_id io-ai-data/lerobot_data \
--batch_size 2 \
--steps 75000 \
--output_dir outputs/train/act_memory_opt \
--job_name act_memory_optimized \
--policy.device cuda \
--policy.chunk_size 100 \
--policy.n_action_steps 100 \
--policy.n_obs_steps 1 \
--policy.vision_backbone resnet18 \
--policy.dim_model 256 \
--policy.optimizer_lr 1e-5 \
--policy.use_amp true \
--num_workers 2 \
--policy.push_to_hub false \
--save_checkpoint true \
--wandb.enable true
参数详细说明
核心参数
参数 | 含义 | 推荐值 | 说明 |
---|---|---|---|
--policy.type | 策略类型 | act | ACT 模型类型 |
--policy.pretrained_path | 预训练模型路径 | lerobot/act | LeRobot 官方 ACT 模型(可选) |
--dataset.repo_id | 数据集仓库 ID | ${HF_USER}/dataset | 你的 HuggingFace 数据集 |
--batch_size | 批处理大小 | 8 | 根据显存调整,RTX 3070 推荐 4-8 |
--steps | 训练步数 | 50000 | 精细任务推荐 50000-100000 步 |
--output_dir | 输出目录 | outputs/train/act_finetuned | 模型保存路径 |
--job_name | 任务名称 | act_finetuning | 用于日志和实验跟踪(可选) |
ACT 特定参数
参数 | 含义 | 推荐值 | 说明 |
---|---|---|---|
--policy.chunk_size | 动作块大小 | 100 | 每次预测的动作数量 |
--policy.n_action_steps | 执行动作步数 | 100 | 实际执行的动作数量 |
--policy.n_obs_steps | 观察步数 | 1 | 历史观察的数量 |
--policy.vision_backbone | 视觉骨干网络 | resnet18 | 图像特征提取网络 |
--policy.dim_model | 模型维度 | 512 | Transformer 主维度 |
--policy.dim_feedforward | 前馈网络维度 | 3200 | Transformer 前馈层维度 |
--policy.n_encoder_layers | 编码器层数 | 4 | Transformer 编码器层数 |
--policy.n_decoder_layers | 解码器层数 | 1 | Transformer 解码器层数 |
--policy.n_heads | 注意力头数 | 8 | 多头注意力的头数 |
--policy.use_vae | 使用VAE | true | 变分目标优化 |
训练参数
参数 | 含义 | 推荐值 | 说明 |
---|---|---|---|
--policy.optimizer_lr | 学习率 | 1e-5 | ACT 推荐较小的学习率 |
--policy.optimizer_weight_decay | 权重衰减 | 0.0 | 正则化参数 |
--policy.optimizer_lr_backbone | 骨干网络学习率 | 1e-5 | 视觉编码器学习率 |
--policy.use_amp | 混合精度 | true | 节省显存 |
--num_workers | 数据加载线程 | 4 | 根据CPU核心数调 整 |
--policy.push_to_hub | 推送到Hub | false | 是否上传模型到HuggingFace(需要repo_id) |
--save_checkpoint | 保存检查点 | true | 是否保存训练检查点 |
--save_freq | 保存频率 | 10000 | 检查点保存间隔 |
训练监控和调试
Weights & Biases 集成
# 详细的 W&B 配置
lerobot-train \
--policy.type act \
--dataset.repo_id your-name/your-dataset \
--batch_size 8 \
--steps 50000 \
--policy.push_to_hub false \
--wandb.enable true \
--wandb.project act_experiments \
--wandb.notes "ACT training with 4 cameras" \
# ... 其他参数
关键指标监控
训练过程中需要关注的指标:
- Total Loss:总体损失,应该稳定下降
- Action Loss:动作预测损失(L1/L2 损失)
- Learning Rate:学习率变化曲线
- Gradient Norm:梯度范数,监控梯度爆炸
- GPU Memory:显存使用情况
- Training Speed:每秒处理的样本数
训练日志分析
# log_analysis.py
import wandb
import matplotlib.pyplot as plt
def analyze_training_logs(project_name, run_name):
api = wandb.Api()
run = api.run(f"{project_name}/{run_name}")
# 获取训练指标
history = run.history()
# 绘制损失曲线
plt.figure(figsize=(12, 4))
plt.subplot(1, 3, 1)
plt.plot(history['step'], history['train/total_loss'])
plt.title('Total Loss')
plt.xlabel('Step')
plt.ylabel('Loss')
plt.subplot(1, 3, 2)
plt.plot(history['step'], history['train/action_loss'])
plt.title('Action Loss')
plt.xlabel('Step')
plt.ylabel('Loss')
plt.subplot(1, 3, 3)
plt.plot(history['step'], history['train/learning_rate'])
plt.title('Learning Rate')
plt.xlabel('Step')
plt.ylabel('LR')
plt.tight_layout()
plt.savefig('training_analysis.png')
plt.show()
if __name__ == "__main__":
analyze_training_logs("act_experiments", "act_v1_multicam")
模型评估
离线评估
# offline_evaluation.py
import torch
import numpy as np
from lerobot.policies.act.modeling_act import ACTPolicy
from lerobot.datasets.lerobot_dataset import LeRobotDataset
def evaluate_act_model(model_path, dataset_path):
# 加载模型
policy = ACTPolicy.from_pretrained(model_path, device="cuda")
policy.eval()
# 加载测试数据集
dataset = LeRobotDataset(dataset_path, split="test")
total_l1_loss = 0
total_l2_loss = 0
num_samples = 0
with torch.no_grad():
for batch in dataset:
# 模型预测
prediction = policy(batch)
# 计算损失
target_actions = batch['action']
predicted_actions = prediction['action']
l1_loss = torch.mean(torch.abs(predicted_actions - target_actions))
l2_loss = torch.mean((predicted_actions - target_actions) ** 2)
total_l1_loss += l1_loss.item()
total_l2_loss += l2_loss.item()
num_samples += 1
avg_l1_loss = total_l1_loss / num_samples
avg_l2_loss = total_l2_loss / num_samples
print(f"Average L1 Loss: {avg_l1_loss:.4f}")
print(f"Average L2 Loss: {avg_l2_loss:.4f}")
return avg_l1_loss, avg_l2_loss
if __name__ == "__main__":
model_path = "outputs/train/act_finetuned/checkpoints/last"
dataset_path = "path/to/your/test/dataset"
evaluate_act_model(model_path, dataset_path)
在线评估(机器人环境)
# robot_evaluation.py
import torch
import numpy as np
from lerobot.policies.act.modeling_act import ACTPolicy
class ACTRobotController:
def __init__(self, model_path, camera_names):
self.policy = ACTPolicy.from_pretrained(model_path, device="cuda")
self.policy.eval()
self.camera_names = camera_names
self.action_queue = []
def get_action(self, observations):
# 如果动作队列为空,预测新的动作块
if len(self.action_queue) == 0:
with torch.no_grad():
# 构建输入
batch = self.prepare_observation(observations)
# 预测动作块
prediction = self.policy(batch)
actions = prediction['action'].cpu().numpy()[0] # [chunk_size, action_dim]
# 将动作添加到队列
self.action_queue = list(actions)
# 返回队列中的下一个动作
return self.action_queue.pop(0)
def prepare_observation(self, observations):
batch = {}
# 处理图像观察
for cam_name in self.camera_names:
image_key = f"observation.images.{cam_name}"
if image_key in observations:
image = observations[image_key]
# 预处理图像(归一化、调整大小等)
image_tensor = self.preprocess_image(image)
batch[image_key] = image_tensor.unsqueeze(0)
# 处理状态观察
if "observation.state" in observations:
state = torch.tensor(observations["observation.state"], dtype=torch.float32)
batch["observation.state"] = state.unsqueeze(0)
return batch
def preprocess_image(self, image):
# 图像预处理逻辑
# 这里需要根据训练时的预处理方式进行相同的处理
image_tensor = torch.tensor(image).permute(2, 0, 1).float() / 255.0
return image_tensor
# 使用示例
if __name__ == "__main__":
controller = ACTRobotController(
model_path="outputs/train/act_finetuned/checkpoints/last",
camera_names=["cam_high", "cam_low", "cam_left_wrist", "cam_right_wrist"]
)
# 模拟机器人控制循环
for step in range(100):
# 获取当前观察(这里需要从实际机器人获取)
observations = {
"observation.images.cam_high": np.random.randint(0, 255, (480, 640, 3)),
"observation.images.cam_low": np.random.randint(0, 255, (480, 640, 3)),
"observation.state": np.random.randn(7)
}
# 获取动作
action = controller.get_action(observations)
# 执行动作(发送到机器人)
print(f"Step {step}: Action = {action}")
# 这里应该将动作发送给实际的机器人
# robot.execute_action(action)
部署和优化
模型量化
# quantization.py
import torch
from lerobot.policies.act.modeling_act import ACTPolicy
def quantize_act_model(model_path, output_path):
# 加载模型
policy = ACTPolicy.from_pretrained(model_path, device="cpu")
policy.eval()
# 动态量化
quantized_policy = torch.quantization.quantize_dynamic(
policy,
{torch.nn.Linear},
dtype=torch.qint8
)
# 保存量化模型
torch.save(quantized_policy.state_dict(), output_path)
print(f"Quantized model saved to {output_path}")
return quantized_policy
if __name__ == "__main__":
quantize_act_model(
"outputs/train/act_finetuned/checkpoints/last",
"outputs/act_quantized.pth"
)
推理优化
# optimized_inference.py
import torch
import torch.jit
from lerobot.policies.act.modeling_act import ACTPolicy
class OptimizedACTInference:
def __init__(self, model_path, use_jit=True):
self.policy = ACTPolicy.from_pretrained(model_path, device="cuda")
self.policy.eval()
if use_jit:
# 使用 TorchScript 优化
self.policy = torch.jit.script(self.policy)
# 预热模型
self.warmup()
def warmup(self):
# 使用虚拟数据预热模型
dummy_batch = {
"observation.images.cam_high": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.images.cam_low": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 7, device="cuda")
}
with torch.no_grad():
for _ in range(10):
_ = self.policy(dummy_batch)
@torch.no_grad()
def predict(self, observations):
# 快速推理
prediction = self.policy(observations)
return prediction['action'].cpu().numpy()
if __name__ == "__main__":
inference = OptimizedACTInference(
"outputs/train/act_finetuned/checkpoints/last"
)
# 测试推理速度
import time
observations = {
"observation.images.cam_high": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.images.cam_low": torch.randn(1, 3, 224, 224, device="cuda"),
"observation.state": torch.randn(1, 7, device="cuda")
}
start_time = time.time()
for _ in range(100):
action = inference.predict(observations)
end_time = time.time()
avg_inference_time = (end_time - start_time) / 100
print(f"Average inference time: {avg_inference_time:.4f} seconds")
print(f"Inference frequency: {1/avg_inference_time:.2f} Hz")
最佳实践
数据收集建议
- 多视角数据:使用多个摄像头获取丰富的视觉信息
- 高质量演示:确保演示数据的一致性和准确性
- 任务多样性:包含不同的起始状态和目标配置
- 失败案例:适当包含失败案例以提高鲁棒性
训练优化建议
- 动作块大小:根据任务复杂度调整 chunk_size
- 学习率调度:使用余弦退火或阶梯式衰减
- 正则化:适当使用权重衰减和 dropout
- 数据增强:对图像进行适当的增强处理
部署优化建议
- 模型压缩:使用量化和剪枝技术减小模型大小
- 推理加速:使用 TensorRT 或 ONNX 进行推理优化
- 内存管理:合理管理动作队列和观察缓存
- 实时性保证:确保推理频率满足控制要求
常见问题 (FAQ)
Q: ACT 与其他模仿学习方法相比有什么优势?
A: ACT 的主要优势包括:
- 减少复合误差:通过预测动作块减少误差累积
- 提高成功率:在精细操作任务上表现优异
- 端到端训练:无需手工设计特征
- 多模态融合:有效融合视觉和状态信息
Q: 如何选择合适的 chunk_size?
A: chunk_size 的选择取决于任务特性:
- 快速任务:chunk_size = 10-30
- 中等任务:chunk_size = 50-100
- 慢速任务:chunk_size = 100-200