自学内容网 自学内容网

使用DDPG 训练强化智能体玩FlapBird

早年间有个叫FlapBird的让人很抓狂的游戏大家应该都还记得吧,刚好在github上搜到一个python写的版本FlapPyBird ,一直以来我都在用强化训练agent玩小游戏,那么这次就是你了,试试把这个游戏也拿下。

首先浏览了下游戏主程序

    async def start(self):
        while True:
            self.background = Background(self.config)
            self.floor = Floor(self.config)
            self.player = Player(self.config)
            self.welcome_message = WelcomeMessage(self.config)
            self.game_over_message = GameOver(self.config)
            self.pipes = Pipes(self.config)
            self.score = Score(self.config)
            await self.splash()
            await self.play()
            await self.game_over()

每次循环中,游戏的各个组件被实例化:

  • Background:背景对象,可能用于绘制游戏背景。
  • Floor:地面对象,可能用于显示游戏中的地面。
  • Player:玩家对象,代表游戏中的玩家。
  • WelcomeMessage:欢迎信息,可能在游戏开始时显示。
  • GameOver:游戏结束信息,可能在游戏结束时显示。
  • Pipes:管道对象,可能是游戏中的障碍物。
  • Score:得分对象,跟踪玩家的得分。

其中play代码

    async def play(self):
        self.score.reset()
        self.player.set_mode(PlayerMode.NORMAL)

        while True:
            if self.player.collided(self.pipes, self.floor):
                return

            for i, pipe in enumerate(self.pipes.upper):
                if self.player.crossed(pipe):
                    self.score.add()

            for event in pygame.event.get():
                self.check_quit_event(event)
                if self.is_tap_event(event):
                    self.player.flap()

            self.background.tick()
            self.floor.tick()
            self.pipes.tick()
            self.score.tick()
            self.player.tick()

            pygame.display.update()
            await asyncio.sleep(0)
            self.config.tick()
  • 重置分数和玩家模式

    self.score.reset()
    self.player.set_mode(PlayerMode.NORMAL)
    
    • self.score.reset():重置分数,准备开始新的游戏。
    • self.player.set_mode(PlayerMode.NORMAL):将玩家的状态设置为正常模式,可能是恢复到初始状态。
  • 游戏主循环:进入一个无限循环,表示游戏的持续运行,直到某个条件触发退出。

  • 检测碰撞

    if self.player.collided(self.pipes, self.floor):
        return
    
    • 使用 self.player.collided(self.pipes, self.floor) 检查玩家是否与管道或地面发生碰撞。如果发生碰撞,返回并结束游戏,跳出循环。
  • 更新得分

    for i, pipe in enumerate(self.pipes.upper):
        if self.player.crossed(pipe):
            self.score.add()
    
    • 遍历上方的管道(self.pipes.upper),检查玩家是否成功跨越了这些管道。
    • self.player.crossed(pipe):如果玩家跨越了管道,调用 self.score.add() 增加分数。
  • 处理事件

    for event in pygame.event.get():
        self.check_quit_event(event)
        if self.is_tap_event(event):
            self.player.flap()
    
    • 使用 pygame.event.get() 获取当前事件队列。
    • self.check_quit_event(event):检查是否有退出事件(如关闭窗口)。
    • self.is_tap_event(event):检查是否检测到点击事件,如果是,就调用 self.player.flap() 使玩家角色进行“拍击”动作,通常用于让角色向上移动。
  • 更新游戏状态

    self.background.tick()
    self.floor.tick()
    self.pipes.tick()
    self.score.tick()
    self.player.tick()
    
    • 调用各个组件的 tick() 方法,更新它们的状态。这可能包括动画更新、位置变化等。
  • 更新显示

    pygame.display.update()
    
    • 更新游戏显示,渲染所有的游戏组件到屏幕上。
  • 异步等待

    await asyncio.sleep(0)
    
    • 这个调用使得游戏在每次循环结束后异步等待,允许其他协程运行。这对于确保游戏的流畅性和响应性非常重要。
  • 配置更新

    self.config.tick()
    
    • 调用配置的 tick() 方法,可能用于更新游戏配置或状态。

各个游戏组件对象都继承自entity,  在tick方法里加点日志

    def tick(self) -> None:
        self.draw()
        rect = self.rect
        if self.config.debug:
            pygame.draw.rect(self.config.screen, (255, 0, 0), rect, 1)
            # write x and y at top of rect
            font = pygame.font.SysFont("Arial", 13, True)
            entity_position_info = {
                'x': f'{self.x:.1f}',
                'y': f'{self.y:.1f}',
                'w': f'{self.w:.1f}',
                'h': f'{self.h:.1f}'
            }
            
            if self.get_type() == 'Pipe':
                print('Pipe', entity_position_info)

            if self.get_type() == 'Player':
                print('Player', entity_position_info)

把管道和笨鸟的特征信息打印下

up.size 2
lower.size 2
-------up pipe-------
Pipe {'x': '84.0', 'y': '-151.0', 'w': '52.0', 'h': '320.0'}
-------low pipe-------
Pipe {'x': '84.0', 'y': '289.0', 'w': '52.0', 'h': '320.0'}
-------up pipe-------
Pipe {'x': '266.0', 'y': '-210.0', 'w': '52.0', 'h': '320.0'}
-------low pipe-------
Pipe {'x': '266.0', 'y': '230.0', 'w': '52.0', 'h': '320.0'}
Player {'x': '57.0', 'y': '386.5', 'w': '34.0', 'h': '24.0'}

这个游戏对人的考验在于抓住管道中间窗口靠近的时机,点击触屏让小鸟飞起来跳过空隙,所以状态空间在我想来,应该只需要关注笨鸟的位置和笨鸟前面的管道缺口位置的坐标

所以环境类和它的状态空间计算可以这么写

import asyncio
import time

import gym
import numpy as np
from gym import spaces

class FlappyEnv(gym.Env):
    def __init__(self, flappy_game):
        super(FlappyEnv, self).__init__()
        # 我只关注笨鸟和它前面的管道口坐标的位置
        self.state_dim = 9
        self.flappy_game = flappy_game

        # 定义动作空间(0: 不跳,1: 跳)
        self.action_space = spaces.Discrete(1)

        # 定义状态空间,可以根据你的游戏状态定义
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.state_dim,), dtype=np.float32)

        # 定义局编号
        self.current_game_num = 0
        # 定义需要跳过的pip
        self.up_pipe, self.low_pipe = None, None

    def reset(self):
        # 重置游戏状态
        import threading
        thread = threading.Thread(target=self.start_flappy_game)
        thread.start()
        self.current_game_num == self.flappy_game.game_num
        return self.get_state()

    def start_flappy_game(self):
        asyncio.run(self.flappy_game.start())

    def step(self, action):
        # 执行动作
        threshold = 0.5
        if action > threshold and hasattr(self.flappy_game, 'player'):  # 跳
            self.flappy_game.player.flap()

        time.sleep(0.5)
        # 获取新的状态、奖励和是否结束
        state, up_pipe, low_pipe = self.get_state()
        reward = self.get_reward()
        done = self.is_game_over()
        if done:
            reward -= 10
        else:
            if self.up_pipe and self.low_pipe:
                if self.up_pipe.number != up_pipe.number:
                    reward += 20
                    self.up_pipe, self.low_pipe = up_pipe, low_pipe
            else:
                self.up_pipe, self.low_pipe = up_pipe, low_pipe
            # 如果飞的位置 超过up 了要扣分
            # print('state', state) state [  57  208   84 -215   84  225]
            if state[5] > state[1] > state[3]:
                reward += 3

            if self.current_game_num < self.flappy_game.game_num:
                self.current_game_num = self.flappy_game.game_num
                reward -= 10

        return state, reward, done, {}

    def get_state(self):
        # 找到笨鸟前面的管道up和low各一个
        up_pipe, low_pipe = self.find_closest_pipes()
        # 返回当前状态的表示
        player = self.get_play()
        return np.array([
            (player.x + player.w) if player else 0,
            player.y if player else 0,
            (player.y + player.h)if player else 0,
            up_pipe.x if up_pipe else 0,
            (up_pipe.x + up_pipe.w) if up_pipe else 0,
            (up_pipe.y + up_pipe.h) if up_pipe else 0,
            low_pipe.x if low_pipe else 0,
            (low_pipe.x + low_pipe.w)if low_pipe else 0,
            low_pipe.y if low_pipe else 0
        ]), up_pipe, low_pipe

    def get_play(self):
        if hasattr(self.flappy_game, 'player'):
            return self.flappy_game.player
        else:
            return None
    def find_closest_pipes(self):
        closest_pipes = None
        min_diff = float('inf')  # 初始化最低差值为正无穷

        # 遍历上下管道
        if hasattr(self.flappy_game, 'pipes'):
            upper_pipes = self.flappy_game.pipes.upper
            lower_pipes = self.flappy_game.pipes.lower

            for i in range(len(upper_pipes)):
                up_pipe = upper_pipes[i]
                low_pipe = lower_pipes[i]
                if (up_pipe.x + up_pipe.w) > self.flappy_game.player.x and \
                        (low_pipe.x + low_pipe.w) > self.flappy_game.player.x:
                    # 计算上管道和下管道的 x 坐标差值
                    diff = abs(up_pipe.x - low_pipe.x)
                    if diff < min_diff:
                        min_diff = diff
                        closest_pipes = (up_pipe, low_pipe)

            return closest_pipes  # 返回找到的最接近的管道组
        else:
            return None, None

    def get_reward(self):
        # 定义奖励机制
        if hasattr(self.flappy_game, 'player') and hasattr(self.flappy_game, 'pipes') \
                and hasattr(self.flappy_game, 'floor'):
            if self.flappy_game.player.collided(self.flappy_game.pipes, self.flappy_game.floor):
                return -1  # 碰撞时惩罚
            return 2
        else:
            return 0

    def is_game_over(self):
        if hasattr(self.flappy_game, 'player') and hasattr(self.flappy_game, 'pipes'):
            return self.flappy_game.player.collided(self.flappy_game.pipes, self.flappy_game.floor)
        else:
            return False

状态空间的想法是把rect 的接触面都算到状态空间内,即笨鸟向前方向的rect的两各顶点坐标,上下管道在缺口处的各两个点的坐标,由于笨鸟顶点和x坐标不变,上下管道的y左边不变,状态空间可以简化为9个维度

[
            (player.x + player.w) if player else 0,
            player.y if player else 0,
            (player.y + player.h)if player else 0,
            up_pipe.x if up_pipe else 0,
            (up_pipe.x + up_pipe.w) if up_pipe else 0,
            (up_pipe.y + up_pipe.h) if up_pipe else 0,
            low_pipe.x if low_pipe else 0,
            (low_pipe.x + low_pipe.w)if low_pipe else 0,
            low_pipe.y if low_pipe else 0
]

奖励函数按照向前没有碰撞进行加分

训练过程在获取next_state的时候睡眠若干毫秒,再取下一刻的状态和计算reward

在Flappy 游戏端做了如果game over就重新开始的操作,所以step方法里我加了如果上一局结束则动作扣分的逻辑,当越过了管道那它前面的管道,最前面的管道发生变化后增加奖励

    def step(self, action):
        # 执行动作
        threshold = 0.5
        if action > threshold and hasattr(self.flappy_game, 'player'):  # 跳
            self.flappy_game.player.flap()

        time.sleep(0.5)
        # 获取新的状态、奖励和是否结束
        state, up_pipe, low_pipe = self.get_state()
        reward = self.get_reward()
        done = self.is_game_over()
        if done:
            reward -= 10
        else:
            if self.up_pipe and self.low_pipe:
                if self.up_pipe.number != up_pipe.number:
                    reward += 20
                    self.up_pipe, self.low_pipe = up_pipe, low_pipe
            else:
                self.up_pipe, self.low_pipe = up_pipe, low_pipe

            if self.current_game_num < self.flappy_game.game_num:
                self.current_game_num = self.flappy_game.game_num
                reward -= 10

        return state, reward, done, {}

DDPG 的模型代码

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim


class Actor(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(state_dim, 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc3 = nn.Linear(256, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return torch.tanh(self.fc3(x))


class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(state_dim + action_dim, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 1)

    def forward(self, state, action):
        x = torch.relu(self.fc1(torch.cat([state, action], 1)))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)


class DDPG:
    def __init__(self, state_dim, action_dim):
        # 检查是否有可用的 GPU
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        print('device', self.device)

        self.actor = Actor(state_dim, action_dim).to(self.device)
        self.critic = Critic(state_dim, action_dim).to(self.device)
        self.actor_target = Actor(state_dim, action_dim).to(self.device)
        self.critic_target = Critic(state_dim, action_dim).to(self.device)

        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-4)
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)

        # 复制权重
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.critic_target.load_state_dict(self.critic.state_dict())

        self.replay_buffer = []
        self.max_buffer_size = 100000
        self.batch_size = 64
        self.gamma = 0.99
        self.tau = 0.005

    def select_action(self, state):
        state = torch.FloatTensor(state).to(self.device).unsqueeze(0)
        return self.actor(state).detach().cpu().numpy()[0]

    def update(self):
        if len(self.replay_buffer) < self.batch_size:
            return

        # 随机选择一批样本
        batch = np.random.choice(len(self.replay_buffer), self.batch_size, replace=False)
        state, action, reward, next_state, done = zip(*[self.replay_buffer[i] for i in batch])

        state = torch.FloatTensor(state).to(self.device)
        action = torch.FloatTensor(action).to(self.device)
        reward = torch.FloatTensor(reward).unsqueeze(1).to(self.device)
        next_state = torch.FloatTensor(next_state).to(self.device)
        done = torch.FloatTensor(done).unsqueeze(1).to(self.device)

        # 更新 Critic
        target_action = self.actor_target(next_state)
        target_q = self.critic_target(next_state, target_action)
        expected_q = reward + (1 - done) * self.gamma * target_q
        critic_loss = nn.MSELoss()(self.critic(state, action), expected_q.detach())

        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # 更新 Actor
        actor_loss = -self.critic(state, self.actor(state)).mean()
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # 更新 Target 网络
        for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

        for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

    def add_experience(self, experience):
        if len(self.replay_buffer) >= self.max_buffer_size:
            self.replay_buffer.pop(0)
        self.replay_buffer.append(experience)

Actor 和 Critic

深度确定性策略梯度(DDPG)算法中,ActorCritic 扮演着不同但互补的角色

Actor 的角色

  • 策略学习:Actor 网络负责生成智能体的策略,也就是给定当前状态下选择的动作。它根据当前的状态输出一个动作(在连续动作空间中,通常是一个实数向量)。

  • 动作选择:Actor 通过策略网络输出动作,并可能添加探索噪声(如 Ornstein-Uhlenbeck 噪声)来鼓励探索。

  • 输入:Actor 接受环境的状态作为输入,并输出一个动作。通常使用非线性激活函数(如 Tanh)来处理输出,以保持动作在特定范围内(例如 [−1,1][-1, 1][−1,1])。

2. Critic 的角色

  • 价值评估:Critic 网络负责评估 Actor 所选择的动作的价值。它根据当前状态和动作输入,输出一个 Q 值(即状态-动作值),表示在给定状态下执行某个动作后预期的回报。

  • 更新策略:Critic 的输出用于计算损失函数,指导 Actor 网络的更新。通过最大化 Critic 输出的 Q 值,Actor 能够学习到更好的策略。

  • 输入:Critic 接受状态和动作的拼接作为输入,并输出一个标量 Q 值。

3. 互动机制

  • Actor-Critic 更新

    • 在每个训练步骤中,Actor 根据当前的状态生成动作,然后 Critic 评估这个动作的价值。
    • Critic 计算 Q 值并根据经验更新其参数。
    • Actor 利用 Critic 评估的 Q 值来优化其策略,通常是通过反向传播来调整网络参数。

在 DDPG(深度确定性策略梯度)算法中,有许多重要的参数需要设置,其中两个常见的参数是 gammatau。下面是这两个参数的详细含义及其在算法中的作用。

1. gamma(折扣因子)

  • 含义gamma 是一个介于 0 和 1 之间的浮点数,用于表示未来奖励的重要性。它决定了在计算未来奖励时,当前奖励与未来奖励的权重。

  • 作用

    • 折扣未来奖励:在 Q 值的计算中,gamma 用于折扣未来的奖励。更高的 gamma 值意味着智能体会更加重视未来的奖励,而较低的值则意味着更关注当前的奖励。
    • 公式:若一个状态的奖励为 r_t,下一状态的 Q 值为 Q(s_{t+1}, a_{t+1}),则目标 Q 值的计算可以表示为:

    • 影响:选择较高的 gamma 值可以促使智能体采取更具长远考虑的策略,但也可能导致学习过程变得更加复杂。

2. tau(软更新参数)

  • 含义tau 是一个较小的浮点数,通常在 0 到 1 之间,用于控制目标网络的更新速度。

  • 作用

    • 软更新目标网络:在 DDPG 中,目标网络(actor_targetcritic_target)的参数不是直接复制自主网络,而是通过软更新的方式逐渐接近。这有助于提高训练的稳定性。
    • 更新公式:目标网络的参数更新通常使用以下公式:

      其中 theta_local 是主网络的参数,theta_target 是目标网络的参数。
    • 影响:选择较小的 tau 值(如 0.005)意味着目标网络更新缓慢,从而保持训练的稳定性,防止在训练过程中因参数变化过快而造成的不稳定。

update 方法更新网络

    def update(self):
        if len(self.replay_buffer) < self.batch_size:
            return

        # 随机选择一批样本
        batch = np.random.choice(len(self.replay_buffer), self.batch_size, replace=False)
        state, action, reward, next_state, done = zip(*[self.replay_buffer[i] for i in batch])

        state = torch.FloatTensor(state).to(self.device)
        action = torch.FloatTensor(action).to(self.device)
        reward = torch.FloatTensor(reward).unsqueeze(1).to(self.device)
        next_state = torch.FloatTensor(next_state).to(self.device)
        done = torch.FloatTensor(done).unsqueeze(1).to(self.device)

        # 更新 Critic
        target_action = self.actor_target(next_state)
        target_q = self.critic_target(next_state, target_action)
        expected_q = reward + (1 - done) * self.gamma * target_q
        critic_loss = nn.MSELoss()(self.critic(state, action), expected_q.detach())

        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()

        # 更新 Actor
        actor_loss = -self.critic(state, self.actor(state)).mean()
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

        # 更新 Target 网络
        for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

        for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
  • 首先检查重放缓冲区是否足够大以进行采样。
  • 随机选择一批经验样本。
  • 计算目标 Q 值,更新 Critic 网络的损失,并进行反向传播。
  • 更新 Actor 网络,最大化 Critic 输出的 Q 值。
  • 使用软更新方法更新目标网络的权重。

add_experience 方法

  • 添加经验:将经验元组(状态、动作、奖励、下一个状态、是否结束)添加到重放缓冲区。
  • 如果缓冲区已满,则移除最旧的经验。

整个 DDPG 类实现了深度强化学习中的 Actor-Critic 方法,利用两个神经网络来分别表示策略(Actor)和价值(Critic)。通过重放缓冲区存储经验并进行批量学习。

动作空间噪声

    def get_ou_noise(self):
        # 生成 Ornstein-Uhlenbeck 噪声
        x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + self.sigma * np.sqrt(
            self.dt) * np.random.randn(self.action_dim)
        self.x_prev = x
        return x

    def select_action(self, state):
        state = torch.FloatTensor(state).to(self.device).unsqueeze(0)
        action = self.actor(state).detach().cpu().numpy()[0]

        # 添加噪声
        noise = self.get_ou_noise()
        action = action + noise

        # 限制动作范围,例如在 [0, 1] 之间
        action = np.clip(action, 0, 1)
        return action
  • 噪声生成:这个方法生成一个 Ornstein-Uhlenbeck 噪声样本,常用于控制任务中的探索,以增加时间相关性。

  • 参数

    • self.x_prev: 上一时刻的噪声值,初始化为零,保持噪声的状态。
    • self.mu: 噪声的均值,通常设为零向量(np.zeros(self.action_dim))。
    • self.theta: 控制噪声回归到均值的速度,值越大,回归越快。
    • self.sigma: 噪声的标准差,控制噪声的幅度。
    • self.dt: 时间步长,通常设为一个小值(例如 1e-2)。

计算新的噪声值 x,公式如下:

  • 其中:

    • 第一项是上一时刻的噪声值。
    • 第二项是回归部分,确保噪声向均值 mu 回归。
    • 第三项是随机噪声项,使用标准正态分布生成。
  • 更新状态:将当前的噪声值 x 赋值给 self.x_prev,以便在下一次调用时使用。

  • 返回值:返回生成的噪声 x

加入DDPG训练代码

def train(load_model=False):
    env = FlappyEnv(flappy_game=Flappy())
    state_dim = env.observation_space.shape[0]
    action_dim = env.action_space.n
    # 输入状态空间和动作空间维度
    ddpg = DDPG(state_dim, action_dim)

    # 加载模型
    save_model_path = 'ddpg_model.pth'
    if load_model and os.path.exists(save_model_path):
        ddpg.load(save_model_path)
        print("Loaded model from", save_model_path)

    num_episodes = 10000
    state, _, _ = env.reset()

    for episode in range(num_episodes):
        # 计算总的奖励值
        total_reward = 0

        for num_step in range(1000):
            action_values = ddpg.select_action(state)
            next_state, reward, done, _ = env.step(action_values[0])
            # 增加到经验区
            ddpg.add_experience((state, action_values, reward, next_state, float(done)))
            ddpg.update()
            state = next_state
            total_reward += reward

            print(f'Episode {episode}, num_step {num_step}, '
                  f'action: {action_values}, '
                  f'state: {next_state}, '
                  f'current Reward: {reward}')

            if done:
                break

        # 每epoch保存一次模型
        if episode % 100 == 0:
            ddpg.save(save_model_path)
            print(f"Episode {episode}, total_step: {num_step}, saved model to", save_model_path)

运行训练,训练日志输出

new game begin, game_num: 37
Episode 5, num_step 13, action: [0.54722494], state: [ 91 236 260 439 491 153 439 491 273], current Reward: 2
Episode 5, num_step 14, action: [0.56834648], state: [ 91 199 223 359 411 153 359 411 273], current Reward: 2
Episode 5, num_step 15, action: [0.60419988], state: [ 91 170 194 284 336 153 284 336 273], current Reward: 2
Episode 5, num_step 16, action: [0.59114947], state: [ 91 145 169 204 256 153 204 256 273], current Reward: 2
Episode 5, num_step 17, action: [0.59423856], state: [ 91. 116. 140. 311. 363. 130. 311. 363. 250.], current Reward: 12
new game begin, game_num: 38
Episode 5, num_step 18, action: [0.56833228], state: [ 91 209 233 409 461 164 409 461 284], current Reward: 2
Episode 5, num_step 19, action: [0.57073671], state: [ 91 179 203 334 386 164 334 386 284], current Reward: 2
Episode 5, num_step 20, action: [0.56179526], state: [ 91 155 179 254 306 164 254 306 284], current Reward: 2
Episode 5, num_step 21, action: [0.54514305], state: [ 91 125 149 179 231 164 179 231 284], current Reward: 2
Episode 5, num_step 22, action: [0.54333423], state: [ 91 101 125  99 151 164  99 151 284], current Reward: 2

基本符合动作和奖惩的关系,让训练跑起来

在训练过程里发现始终没有拿到跨过管道的奖励,一直没有迈过第一层管道,似乎陷入了局部最优

调试了下发现是奖励函数在一个地方没有控制好,我本意是想判断如果跨过了管道就给奖励,结果环境里的逻辑一旦失败就重启博弈,管道都刷新了也符合这个判断,所以出现一直往管道上撞击,毕竟malkvo 链上得到得结论是那个方向得奖励最大

做了下修改,避免失败了还给高分奖励

        if self.up_pipe and self.low_pipe:
            if self.up_pipe.number != up_pipe.number \
                    and self.up_pipe in self.flappy_game.pipes.upper \
                    and self.low_pipe in self.flappy_game.pipes.lower:
                # 此处需要修改bug,重新开始博弈后也会掉进这个逻辑里
                reward += 10
                self.up_pipe, self.low_pipe = up_pipe, low_pipe

DDPG 里加上模型保存逻辑


    def save(self, filename):
        torch.save({
            'actor_state_dict': self.actor.state_dict(),
            'critic_state_dict': self.critic.state_dict(),
            'actor_target_state_dict': self.actor_target.state_dict(),
            'critic_target_state_dict': self.critic_target.state_dict(),
            'actor_optimizer_state_dict': self.actor_optimizer.state_dict(),
            'critic_optimizer_state_dict': self.critic_optimizer.state_dict(),
        }, filename)

    def load(self, filename):
        checkpoint = torch.load(filename)
        self.actor.load_state_dict(checkpoint['actor_state_dict'])
        self.critic.load_state_dict(checkpoint['critic_state_dict'])
        self.actor_target.load_state_dict(checkpoint['actor_target_state_dict'])
        self.critic_target.load_state_dict(checkpoint['critic_target_state_dict'])
        self.actor_optimizer.load_state_dict(checkpoint['actor_optimizer_state_dict'])
        self.critic_optimizer.load_state_dict(checkpoint['critic_optimizer_state_dict'])

查了下用Q-Learn的几篇帖子里说训练会几个典型阶段:

迭代5万次,通过管道获取的奖励少,小鸟一直向上飞(直接摆烂… …),几乎一个管道通过不了;
迭代50万次,偶尔可以通过一两个管道;
迭代100万次,可以通过4、5个管道;
迭代150万次,可以通过超过15个管道;
迭代250万次,可以一直通过管道,极少数会失误; 迭代300万次,小鸟一直向前飞… …

训练代码加上每100 epoch保存一次模型,最后训练完成使用matplotlib绘制奖励曲线

# 绘制奖励线图
    plt.plot(rewards)
    plt.title("Training Rewards Over Time")
    plt.xlabel("Episode")
    plt.ylabel("Total Reward")
    plt.savefig("training_rewards.png", format='png')

后面试验结束后附图

代码提交在github:

https://github.com/chenrui2200/flapbird_ddpg_train


原文地址:https://blog.csdn.net/u011564831/article/details/143820321

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!