自学内容网 自学内容网

3.DDPG

1 DDPG

1.1 原著

在这里插入图片描述

  • DDPG algorithm

在这里插入图片描述DDPG(Deep Deterministic Policy Gradient)是一种用于连续动作空间的强化学习算法,属于Actor-Critic方法的变体,结合了DQN(Deep Q-Network)和DPG(Deterministic Policy Gradient)的思想。DDPG在强化学习中的优势在于能够处理高维、连续的动作空间,适合于复杂的控制任务,比如机器人控制和自动驾驶等。

1.2 算法框架

DDPG的核心思想是使用两个神经网络(一个Actor和一个Critic)来学习策略和价值函数:

  • Actor网络:负责生成动作,输出一个确定性的动作而非概率分布。这种确定性动作使得DDPG适合于连续的动作空间。
  • Critic网络:负责评价动作的价值,使用类似Q值的概念来评估给定状态和动作的价值。

在DDPG中,使用经验回放和目标网络来稳定训练过程,类似于DQN。

1.3 算法流程

  • 初始化网络:初始化Actor网络和Critic网络,分别为 μ ( s ∣ θ μ ) \mu(s|\theta^\mu) μ(sθμ) Q ( s , a ∣ θ Q ) Q(s, a|\theta^Q) Q(s,aθQ);同时,创建Actor和Critic的目标网络(Target Network) ( μ ′ ( s ∣ θ μ ′ ) (\mu'(s|\theta^{\mu'}) (μ(sθμ) Q ′ ( s , a ∣ θ Q ′ ) Q'(s, a|\theta^{Q'}) Q(s,aθQ),并将目标网络的参数设置为等于对应的主网络。

  • 经验回放:利用经验回放池(Replay Buffer)存储每一步的转移样本,训练时从经验池中随机采样,这样能打破数据之间的相关性,提高训练的稳定性。

  • 目标网络:使用软更新(Soft Update)来更新目标网络的参数,而非直接从主网络复制,这样可以缓解不稳定性。更新公式为:
    θ μ ′ ← τ θ μ + ( 1 − τ ) θ μ ′ θ Q ′ ← τ θ Q + ( 1 − τ ) θ Q ′ \theta^{\mu'} \leftarrow \tau \theta^\mu + (1 - \tau) \theta^{\mu'} \\ \theta^{Q'} \leftarrow \tau \theta^Q + (1 - \tau) \theta^{Q'} θμτθμ+(1τ)θμθQτθQ+(1τ)θQ

    其中 ( τ ) (\tau) (τ)通常取值为一个较小的数,如0.001。

  • Critic网络训练:对Critic网络应用Bellman方程。目标为最小化:
    L = 1 N ∑ i ( y i − Q ( s i , a i ∣ θ Q ) ) 2 L = \frac{1}{N} \sum_i \left( y_i - Q(s_i, a_i|\theta^Q) \right)^2 L=N1i(yiQ(si,aiθQ))2
    其中 y i = r i + γ Q ′ ( s i + 1 , μ ′ ( s i + 1 ∣ θ μ ′ ) ∣ θ Q ′ ) y_i = r_i + \gamma Q'(s_{i+1}, \mu'(s_{i+1}|\theta^{\mu'})|\theta^{Q'}) yi=ri+γQ(si+1,μ(si+1θμ)θQ) 是Critic网络的目标值。

  • Actor网络训练:更新Actor网络的策略,通过最大化Critic网络的输出来提升策略。梯度为:
    ∇ θ μ J ≈ 1 N ∑ i ∇ a Q ( s , a ∣ θ Q ) ∣ s = s i , a = μ ( s i ) ∇ θ μ μ ( s ∣ θ μ ) ∣ s = s i \nabla_{\theta^\mu} J \approx \frac{1}{N} \sum_i \nabla_a Q(s, a|\theta^Q)|_{s=s_i, a=\mu(s_i)} \nabla_{\theta^\mu} \mu(s|\theta^\mu)|_{s=s_i} θμJN1iaQ(s,aθQ)s=si,a=μ(si)θμμ(sθμ)s=si

  • 噪声策略:由于DDPG是确定性策略,容易陷入局部最优解,因此需要引入噪声,通常使用Ornstein-Uhlenbeck过程,能够生成时间相关的噪声,适合于连续动作空间中的探索。

1.4 DDPG的优缺点

  • 优点

    • 能处理高维连续动作空间的任务;
    • 在特定领域(如机器人控制)表现良好;
    • 引入目标网络和经验回放,训练过程更稳定。
  • 缺点

    • 噪声控制较为复杂,容易陷入局部最优;
    • 对超参数(如学习率、(\tau)值等)敏感;
    • 计算资源消耗较大,训练时间较长。

1.5 应用场景

DDPG广泛应用于需要精确控制的任务,例如:

  • 机器人控制:机械臂的精确操作,或移动机器人的轨迹控制;
  • 自动驾驶:车辆的连续加速、减速和方向调整;
  • 游戏控制:如OpenAI Gym中一些具有连续动作空间的游戏环境(如Pendulum、MountainCar等)。

通过DDPG,强化学习能够在高维、连续的动作空间中高效地学习决策策略,推动了复杂控制任务的发展。

2 DDPG Coding

Pendulum-v1 是一个经典的强化学习环境,属于 OpenAI Gym 库的一部分。这个环境模拟了一个倒立摆的问题,目标是通过施加适当的扭矩使摆杆保持在竖直向上的位置。

2.1 框架

在这里插入图片描述

2.2 DDPG Agent

import random

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from collections import deque

# 判断是否使用 GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Hyperparameters
LR_ACTOR = 1e-4
LR_CRITIC = 1e-3
GAMMA = 0.99
BATCH_SIZE = 64
MEMORY_SIZE = 100000
TAU = 5e-3
HIDDEN_DIM = 64


# Actor 和 Critic 网络
class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action, hidden_dim=HIDDEN_DIM):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, action_dim)
        self.max_action = max_action

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        action = torch.tanh(self.fc3(x)) * self.max_action
        return action


class Critic(nn.Module):
    def __init__(self, state_dim, action_dim, hidden_dim=HIDDEN_DIM):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(state_dim + action_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, 1)

    def forward(self, state, action):
        x = torch.cat([state, action], 1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        q_value = self.fc3(x)
        return q_value


#经验回放池和目标网络
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=int(capacity))

    # def add(self, transition):
    # self.buffer.append(transition)
    def add(self, state, action, reward, next_state, done):
        state = np.expand_dims(state, 0)
        next_state = np.expand_dims(next_state, 0)
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        state, action, reward, next_state, done = zip(*batch)
        return np.concatenate(state), action, reward, np.concatenate(next_state), done
        # return (
        #     torch.FloatTensor(state),
        #     torch.FloatTensor(action),
        #     torch.FloatTensor(reward).unsqueeze(1),
        #     torch.FloatTensor(next_state),
        #     torch.FloatTensor(done).unsqueeze(1)
        # )

    def __len__(self):
        return len(self.buffer)


class DDPGAgent:
    def __init__(self, state_dim, action_dim, max_action):
        self.actor = Actor(state_dim, action_dim, max_action).to(device)
        self.actor_target = Actor(state_dim, action_dim, max_action).to(device)
        self.actor_target.load_state_dict(self.actor.state_dict())

        self.critic = Critic(state_dim, action_dim).to(device)
        self.critic_target = Critic(state_dim, action_dim).to(device)
        self.critic_target.load_state_dict(self.critic.state_dict())

        self.actor_optimizer = optim.Adam(self.actor.parameters(), LR_ACTOR)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), LR_CRITIC)

        self.replay_buffer = ReplayBuffer(MEMORY_SIZE)
        self.max_action = max_action

    def select_action(self, state):
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
        action = self.actor(state).cpu().data.numpy().flatten()
        return action
        # state = torch.FloatTensor(state.reshape(1, -1))
        # return self.actor(state).cpu().data.numpy().flatten()

    def train(self):
        if len(self.replay_buffer) < BATCH_SIZE:
            return

        states, actions, rewards, next_states, dones = self.replay_buffer.sample(BATCH_SIZE)

        states = torch.FloatTensor(states).to(device)
        actions = torch.FloatTensor(np.vstack(actions)).to(device)
        rewards = torch.FloatTensor(rewards).unsqueeze(1).to(device)
        next_states = torch.FloatTensor(next_states).to(device)
        dones = torch.FloatTensor(dones).unsqueeze(1).to(device)

        # Critic update :MSE
        next_actions = self.actor_target(next_states)
        target_q = self.critic_target(next_states, next_actions.detach())  # 不执行
        target_q = rewards + (1 - dones) * GAMMA * target_q

        # 当前真实的q
        current_q = self.critic(states, actions)
        critic_loss = nn.MSELoss()(current_q, target_q)

        self.critic_optimizer.zero_grad()  # 清除上一步的梯度
        critic_loss.backward()  # 计算梯度
        self.critic_optimizer.step()  # 更新 critic 参数

        # Actor update 策略梯度的方式
        actor_loss = -self.critic(states, self.actor(states)).mean()
        self.actor_optimizer.zero_grad()
        actor_loss.backward()  # 计算梯度
        self.actor_optimizer.step()  # 更新 actor 参数

        # Target networks update
        for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
            target_param.data.copy_(TAU * param.data + (1 - TAU) * target_param.data)

        for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
            target_param.data.copy_(TAU * param.data + (1 - TAU) * target_param.data)

2.3 Train DDPG

import os.path
import random
import time
import gym
import numpy as np
import torch
from agent_ddpg import DDPGAgent


# initialize env
env = gym.make(id='Pendulum-v1')
STATE_DIM = env.observation_space.shape[0]
ACTION_DIM = env.action_space.shape[0]

agent = DDPGAgent(STATE_DIM, ACTION_DIM, 2)

# 训练参数设置
NUM_EPISODE = 100
NUM_STEP = 200

#添加随机噪声的部分为 ε-greedy 策略
epsilon_start = 1.0  # 起始的 epsilon 值,探索概率较高
epsilon_end = 0.02  # 最终的 epsilon 值,探索概率降低, 0.05
epsilon_decay = 10000  # 每个时间步的 epsilon 衰减量, 1e-3

# 训练循环
REWARD_BUFFER = np.empty(shape=NUM_EPISODE)
for episode_i in range(NUM_EPISODE):
    state, others = env.reset()
    episode_reward = 0

    for step_i in range(NUM_STEP):
        epsilon = np.interp(x=episode_i * NUM_STEP + step_i, xp=[0, epsilon_decay], fp=[epsilon_start, epsilon_end])
        random_sample = random.random()
        if random_sample <= epsilon:
            # 探索:选择一个随机动作
            action = np.random.uniform(-2, 2, size=ACTION_DIM)
        else:
            # 利用:使用 Actor 网络选择动作
            action = agent.select_action(state)
        # 执行动作并获得反馈
        next_state, reward, done, truncation, info = env.step(action)
        # 将经验存储到回放池中
        agent.replay_buffer.add(state, action, reward, next_state, done)
        # 更新状态
        state = next_state
        episode_reward += reward
        agent.train()

        # 如果完成,结束当前回合
        if done:
            break

    REWARD_BUFFER[episode_i] = episode_reward
    # 打印每个回合的总奖励
    print(f"Episode {episode_i + 1}, Reward: {round(episode_reward, 2)}")

# 在训练完成后保存模型参数

# 创建缺失的目录
current_path = os.path.dirname(os.path.realpath(__file__))
model_path = current_path + '/models/'
os.makedirs(model_path, exist_ok=True)
timestamp = time.strftime("%Y%m%d%H%M%S")
torch.save(agent.actor.state_dict(), model_path+f"ddpg_actor_{timestamp}.pth")
torch.save(agent.critic.state_dict(),  model_path+f"ddpg_critic_{timestamp}.pth")

env.close()



2.4 Test DDPG

import os
import gym
import numpy as np
import torch
import torch.nn as nn
import pygame

# 判断是否使用 GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

if torch.cuda.is_available():
    print(f"{device}:{torch.cuda.is_available()}")
    print("device_count:", torch.cuda.device_count())
    print("current_device:", torch.cuda.current_device())
    print("device_name:", torch.cuda.get_device_name(0))


class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action, hidden_dim=64):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, action_dim)
        self.max_action = max_action

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))
        action = torch.tanh(self.fc3(x)) * self.max_action
        return action

    def process_frame(self, frame):
        frame = np.transpose(frame,(1,0,2))   # 转置 frame,使得宽度和高度与显示的顺序一致
        frame_surface = pygame.surfarray.make_surface(frame)#将 numpy 数组转换为 pygame.Surface
        return pygame.transform.scale(frame_surface,(screen_width,screen_height))
    # return frame_surface #直接返回也可以,就是默认大小


# initialize env
env = gym.make(id='Pendulum-v1',render_mode="rgb_array")
STATE_DIM = env.observation_space.shape[0]
ACTION_DIM = env.action_space.shape[0]

current_path = os.path.dirname(os.path.realpath(__file__))
model = current_path + '/models/'
actor_para_path = model + "ddpg_actor_20241110091637.pth"

actor_agent = Actor(STATE_DIM, ACTION_DIM, 2).to(device)
# 加载训练好的模型参数(测试时使用)
actor_agent.load_state_dict((torch.load(actor_para_path, weights_only=True)))

# initialze pygame
pygame.init()
screen_width, screen_height = 600, 600
screen = pygame.display.set_mode((screen_width, screen_height))
clock = pygame.time.Clock()

# 测试过程
NUM_EPISODE = 30
NUM_STEP = 200

for episode_i in range(NUM_EPISODE):
    state, others = env.reset()
    episode_reward = 0

    for step_i in range(NUM_STEP):
        state = torch.FloatTensor(state).unsqueeze(0).to(device)
        action = actor_agent(state).cpu().data.numpy().flatten()
        # action = actor_agent(state).detach().cpu().numpy()[0]
        next_state, reward, done, truncation, info = env.step(action)
        state = next_state
        episode_reward += reward
        print(f"step: {step_i}, action: {action}")

        fame = env.render() # 返回的帧是一个 numpy 数组
        frame_surface = actor_agent.process_frame(fame)
        screen.blit(frame_surface,(0,0)) # 需要一个 Surface 类型的对象
        pygame.display.flip()
        clock.tick(60) #fps
    print(f"Episode {episode_i + 1}, Reward: {round(episode_reward, 2)}")

pygame.quit()
env.close()

2.5 结果

在这里插入图片描述


原文地址:https://blog.csdn.net/u014217137/article/details/143660518

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!