3.DDPG
1 DDPG
1.1 原著
- 出自
“CONTINUOUS CONTROL WITH DEEP REINFORCEMENT LEARNING”,2016,Timothy P. Lillicrap
源paper 链接:https://scholar.google.com/scholar?hl=en&as_sdt=0,5&cluster=4133004576987558805
- DDPG algorithm
DDPG(Deep Deterministic Policy Gradient)是一种用于连续动作空间的强化学习算法,属于Actor-Critic方法的变体,结合了DQN(Deep Q-Network)和DPG(Deterministic Policy Gradient)的思想。DDPG在强化学习中的优势在于能够处理高维、连续的动作空间,适合于复杂的控制任务,比如机器人控制和自动驾驶等。
1.2 算法框架
DDPG的核心思想是使用两个神经网络(一个Actor和一个Critic)来学习策略和价值函数:
- Actor网络:负责生成动作,输出一个确定性的动作而非概率分布。这种确定性动作使得DDPG适合于连续的动作空间。
- Critic网络:负责评价动作的价值,使用类似Q值的概念来评估给定状态和动作的价值。
在DDPG中,使用经验回放和目标网络来稳定训练过程,类似于DQN。
1.3 算法流程
-
初始化网络:初始化Actor网络和Critic网络,分别为 μ ( s ∣ θ μ ) \mu(s|\theta^\mu) μ(s∣θμ)和 Q ( s , a ∣ θ Q ) Q(s, a|\theta^Q) Q(s,a∣θQ);同时,创建Actor和Critic的目标网络(Target Network) ( μ ′ ( s ∣ θ μ ′ ) (\mu'(s|\theta^{\mu'}) (μ′(s∣θμ′)和 Q ′ ( s , a ∣ θ Q ′ ) Q'(s, a|\theta^{Q'}) Q′(s,a∣θQ′),并将目标网络的参数设置为等于对应的主网络。
-
经验回放:利用经验回放池(Replay Buffer)存储每一步的转移样本,训练时从经验池中随机采样,这样能打破数据之间的相关性,提高训练的稳定性。
-
目标网络:使用软更新(Soft Update)来更新目标网络的参数,而非直接从主网络复制,这样可以缓解不稳定性。更新公式为:
θ μ ′ ← τ θ μ + ( 1 − τ ) θ μ ′ θ Q ′ ← τ θ Q + ( 1 − τ ) θ Q ′ \theta^{\mu'} \leftarrow \tau \theta^\mu + (1 - \tau) \theta^{\mu'} \\ \theta^{Q'} \leftarrow \tau \theta^Q + (1 - \tau) \theta^{Q'} θμ′←τθμ+(1−τ)θμ′θQ′←τθQ+(1−τ)θQ′其中 ( τ ) (\tau) (τ)通常取值为一个较小的数,如0.001。
-
Critic网络训练:对Critic网络应用Bellman方程。目标为最小化:
L = 1 N ∑ i ( y i − Q ( s i , a i ∣ θ Q ) ) 2 L = \frac{1}{N} \sum_i \left( y_i - Q(s_i, a_i|\theta^Q) \right)^2 L=N1i∑(yi−Q(si,ai∣θQ))2
其中 y i = r i + γ Q ′ ( s i + 1 , μ ′ ( s i + 1 ∣ θ μ ′ ) ∣ θ Q ′ ) y_i = r_i + \gamma Q'(s_{i+1}, \mu'(s_{i+1}|\theta^{\mu'})|\theta^{Q'}) yi=ri+γQ′(si+1,μ′(si+1∣θμ′)∣θQ′) 是Critic网络的目标值。 -
Actor网络训练:更新Actor网络的策略,通过最大化Critic网络的输出来提升策略。梯度为:
∇ θ μ J ≈ 1 N ∑ i ∇ a Q ( s , a ∣ θ Q ) ∣ s = s i , a = μ ( s i ) ∇ θ μ μ ( s ∣ θ μ ) ∣ s = s i \nabla_{\theta^\mu} J \approx \frac{1}{N} \sum_i \nabla_a Q(s, a|\theta^Q)|_{s=s_i, a=\mu(s_i)} \nabla_{\theta^\mu} \mu(s|\theta^\mu)|_{s=s_i} ∇θμJ≈N1i∑∇aQ(s,a∣θQ)∣s=si,a=μ(si)∇θμμ(s∣θμ)∣s=si -
噪声策略:由于DDPG是确定性策略,容易陷入局部最优解,因此需要引入噪声,通常使用Ornstein-Uhlenbeck过程,能够生成时间相关的噪声,适合于连续动作空间中的探索。
1.4 DDPG的优缺点
-
优点:
- 能处理高维连续动作空间的任务;
- 在特定领域(如机器人控制)表现良好;
- 引入目标网络和经验回放,训练过程更稳定。
-
缺点:
- 噪声控制较为复杂,容易陷入局部最优;
- 对超参数(如学习率、(\tau)值等)敏感;
- 计算资源消耗较大,训练时间较长。
1.5 应用场景
DDPG广泛应用于需要精确控制的任务,例如:
- 机器人控制:机械臂的精确操作,或移动机器人的轨迹控制;
- 自动驾驶:车辆的连续加速、减速和方向调整;
- 游戏控制:如OpenAI Gym中一些具有连续动作空间的游戏环境(如Pendulum、MountainCar等)。
通过DDPG,强化学习能够在高维、连续的动作空间中高效地学习决策策略,推动了复杂控制任务的发展。
2 DDPG Coding
Pendulum-v1 是一个经典的强化学习环境,属于 OpenAI Gym 库的一部分。这个环境模拟了一个倒立摆的问题,目标是通过施加适当的扭矩使摆杆保持在竖直向上的位置。
2.1 框架
2.2 DDPG Agent
import random
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque
# 判断是否使用 GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Hyperparameters
LR_ACTOR = 1e-4
LR_CRITIC = 1e-3
GAMMA = 0.99
BATCH_SIZE = 64
MEMORY_SIZE = 100000
TAU = 5e-3
HIDDEN_DIM = 64
# Actor 和 Critic 网络
class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action, hidden_dim=HIDDEN_DIM):
super(Actor, self).__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, action_dim)
self.max_action = max_action
def forward(self, state):
x = torch.relu(self.fc1(state))
x = torch.relu(self.fc2(x))
action = torch.tanh(self.fc3(x)) * self.max_action
return action
class Critic(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim=HIDDEN_DIM):
super(Critic, self).__init__()
self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, 1)
def forward(self, state, action):
x = torch.cat([state, action], 1)
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
q_value = self.fc3(x)
return q_value
#经验回放池和目标网络
class ReplayBuffer:
def __init__(self, capacity):
self.buffer = deque(maxlen=int(capacity))
# def add(self, transition):
# self.buffer.append(transition)
def add(self, state, action, reward, next_state, done):
state = np.expand_dims(state, 0)
next_state = np.expand_dims(next_state, 0)
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
batch = random.sample(self.buffer, batch_size)
state, action, reward, next_state, done = zip(*batch)
return np.concatenate(state), action, reward, np.concatenate(next_state), done
# return (
# torch.FloatTensor(state),
# torch.FloatTensor(action),
# torch.FloatTensor(reward).unsqueeze(1),
# torch.FloatTensor(next_state),
# torch.FloatTensor(done).unsqueeze(1)
# )
def __len__(self):
return len(self.buffer)
class DDPGAgent:
def __init__(self, state_dim, action_dim, max_action):
self.actor = Actor(state_dim, action_dim, max_action).to(device)
self.actor_target = Actor(state_dim, action_dim, max_action).to(device)
self.actor_target.load_state_dict(self.actor.state_dict())
self.critic = Critic(state_dim, action_dim).to(device)
self.critic_target = Critic(state_dim, action_dim).to(device)
self.critic_target.load_state_dict(self.critic.state_dict())
self.actor_optimizer = optim.Adam(self.actor.parameters(), LR_ACTOR)
self.critic_optimizer = optim.Adam(self.critic.parameters(), LR_CRITIC)
self.replay_buffer = ReplayBuffer(MEMORY_SIZE)
self.max_action = max_action
def select_action(self, state):
state = torch.FloatTensor(state).unsqueeze(0).to(device)
action = self.actor(state).cpu().data.numpy().flatten()
return action
# state = torch.FloatTensor(state.reshape(1, -1))
# return self.actor(state).cpu().data.numpy().flatten()
def train(self):
if len(self.replay_buffer) < BATCH_SIZE:
return
states, actions, rewards, next_states, dones = self.replay_buffer.sample(BATCH_SIZE)
states = torch.FloatTensor(states).to(device)
actions = torch.FloatTensor(np.vstack(actions)).to(device)
rewards = torch.FloatTensor(rewards).unsqueeze(1).to(device)
next_states = torch.FloatTensor(next_states).to(device)
dones = torch.FloatTensor(dones).unsqueeze(1).to(device)
# Critic update :MSE
next_actions = self.actor_target(next_states)
target_q = self.critic_target(next_states, next_actions.detach()) # 不执行
target_q = rewards + (1 - dones) * GAMMA * target_q
# 当前真实的q
current_q = self.critic(states, actions)
critic_loss = nn.MSELoss()(current_q, target_q)
self.critic_optimizer.zero_grad() # 清除上一步的梯度
critic_loss.backward() # 计算梯度
self.critic_optimizer.step() # 更新 critic 参数
# Actor update 策略梯度的方式
actor_loss = -self.critic(states, self.actor(states)).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward() # 计算梯度
self.actor_optimizer.step() # 更新 actor 参数
# Target networks update
for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
target_param.data.copy_(TAU * param.data + (1 - TAU) * target_param.data)
for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
target_param.data.copy_(TAU * param.data + (1 - TAU) * target_param.data)
2.3 Train DDPG
import os.path
import random
import time
import gym
import numpy as np
import torch
from agent_ddpg import DDPGAgent
# initialize env
env = gym.make(id='Pendulum-v1')
STATE_DIM = env.observation_space.shape[0]
ACTION_DIM = env.action_space.shape[0]
agent = DDPGAgent(STATE_DIM, ACTION_DIM, 2)
# 训练参数设置
NUM_EPISODE = 100
NUM_STEP = 200
#添加随机噪声的部分为 ε-greedy 策略
epsilon_start = 1.0 # 起始的 epsilon 值,探索概率较高
epsilon_end = 0.02 # 最终的 epsilon 值,探索概率降低, 0.05
epsilon_decay = 10000 # 每个时间步的 epsilon 衰减量, 1e-3
# 训练循环
REWARD_BUFFER = np.empty(shape=NUM_EPISODE)
for episode_i in range(NUM_EPISODE):
state, others = env.reset()
episode_reward = 0
for step_i in range(NUM_STEP):
epsilon = np.interp(x=episode_i * NUM_STEP + step_i, xp=[0, epsilon_decay], fp=[epsilon_start, epsilon_end])
random_sample = random.random()
if random_sample <= epsilon:
# 探索:选择一个随机动作
action = np.random.uniform(-2, 2, size=ACTION_DIM)
else:
# 利用:使用 Actor 网络选择动作
action = agent.select_action(state)
# 执行动作并获得反馈
next_state, reward, done, truncation, info = env.step(action)
# 将经验存储到回放池中
agent.replay_buffer.add(state, action, reward, next_state, done)
# 更新状态
state = next_state
episode_reward += reward
agent.train()
# 如果完成,结束当前回合
if done:
break
REWARD_BUFFER[episode_i] = episode_reward
# 打印每个回合的总奖励
print(f"Episode {episode_i + 1}, Reward: {round(episode_reward, 2)}")
# 在训练完成后保存模型参数
# 创建缺失的目录
current_path = os.path.dirname(os.path.realpath(__file__))
model_path = current_path + '/models/'
os.makedirs(model_path, exist_ok=True)
timestamp = time.strftime("%Y%m%d%H%M%S")
torch.save(agent.actor.state_dict(), model_path+f"ddpg_actor_{timestamp}.pth")
torch.save(agent.critic.state_dict(), model_path+f"ddpg_critic_{timestamp}.pth")
env.close()
2.4 Test DDPG
import os
import gym
import numpy as np
import torch
import torch.nn as nn
import pygame
# 判断是否使用 GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
if torch.cuda.is_available():
print(f"{device}:{torch.cuda.is_available()}")
print("device_count:", torch.cuda.device_count())
print("current_device:", torch.cuda.current_device())
print("device_name:", torch.cuda.get_device_name(0))
class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action, hidden_dim=64):
super(Actor, self).__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, action_dim)
self.max_action = max_action
def forward(self, state):
x = torch.relu(self.fc1(state))
x = torch.relu(self.fc2(x))
action = torch.tanh(self.fc3(x)) * self.max_action
return action
def process_frame(self, frame):
frame = np.transpose(frame,(1,0,2)) # 转置 frame,使得宽度和高度与显示的顺序一致
frame_surface = pygame.surfarray.make_surface(frame)#将 numpy 数组转换为 pygame.Surface
return pygame.transform.scale(frame_surface,(screen_width,screen_height))
# return frame_surface #直接返回也可以,就是默认大小
# initialize env
env = gym.make(id='Pendulum-v1',render_mode="rgb_array")
STATE_DIM = env.observation_space.shape[0]
ACTION_DIM = env.action_space.shape[0]
current_path = os.path.dirname(os.path.realpath(__file__))
model = current_path + '/models/'
actor_para_path = model + "ddpg_actor_20241110091637.pth"
actor_agent = Actor(STATE_DIM, ACTION_DIM, 2).to(device)
# 加载训练好的模型参数(测试时使用)
actor_agent.load_state_dict((torch.load(actor_para_path, weights_only=True)))
# initialze pygame
pygame.init()
screen_width, screen_height = 600, 600
screen = pygame.display.set_mode((screen_width, screen_height))
clock = pygame.time.Clock()
# 测试过程
NUM_EPISODE = 30
NUM_STEP = 200
for episode_i in range(NUM_EPISODE):
state, others = env.reset()
episode_reward = 0
for step_i in range(NUM_STEP):
state = torch.FloatTensor(state).unsqueeze(0).to(device)
action = actor_agent(state).cpu().data.numpy().flatten()
# action = actor_agent(state).detach().cpu().numpy()[0]
next_state, reward, done, truncation, info = env.step(action)
state = next_state
episode_reward += reward
print(f"step: {step_i}, action: {action}")
fame = env.render() # 返回的帧是一个 numpy 数组
frame_surface = actor_agent.process_frame(fame)
screen.blit(frame_surface,(0,0)) # 需要一个 Surface 类型的对象
pygame.display.flip()
clock.tick(60) #fps
print(f"Episode {episode_i + 1}, Reward: {round(episode_reward, 2)}")
pygame.quit()
env.close()
2.5 结果
原文地址:https://blog.csdn.net/u014217137/article/details/143660518
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!