使用DDPG 训练强化智能体玩FlapBird
早年间有个叫FlapBird的让人很抓狂的游戏大家应该都还记得吧,刚好在github上搜到一个python写的版本FlapPyBird ,一直以来我都在用强化训练agent玩小游戏,那么这次就是你了,试试把这个游戏也拿下。
首先浏览了下游戏主程序
async def start(self):
while True:
self.background = Background(self.config)
self.floor = Floor(self.config)
self.player = Player(self.config)
self.welcome_message = WelcomeMessage(self.config)
self.game_over_message = GameOver(self.config)
self.pipes = Pipes(self.config)
self.score = Score(self.config)
await self.splash()
await self.play()
await self.game_over()
每次循环中,游戏的各个组件被实例化:
Background
:背景对象,可能用于绘制游戏背景。Floor
:地面对象,可能用于显示游戏中的地面。Player
:玩家对象,代表游戏中的玩家。WelcomeMessage
:欢迎信息,可能在游戏开始时显示。GameOver
:游戏结束信息,可能在游戏结束时显示。Pipes
:管道对象,可能是游戏中的障碍物。Score
:得分对象,跟踪玩家的得分。
其中play代码
async def play(self):
self.score.reset()
self.player.set_mode(PlayerMode.NORMAL)
while True:
if self.player.collided(self.pipes, self.floor):
return
for i, pipe in enumerate(self.pipes.upper):
if self.player.crossed(pipe):
self.score.add()
for event in pygame.event.get():
self.check_quit_event(event)
if self.is_tap_event(event):
self.player.flap()
self.background.tick()
self.floor.tick()
self.pipes.tick()
self.score.tick()
self.player.tick()
pygame.display.update()
await asyncio.sleep(0)
self.config.tick()
-
重置分数和玩家模式:
self.score.reset() self.player.set_mode(PlayerMode.NORMAL)
self.score.reset()
:重置分数,准备开始新的游戏。self.player.set_mode(PlayerMode.NORMAL)
:将玩家的状态设置为正常模式,可能是恢复到初始状态。
-
游戏主循环:进入一个无限循环,表示游戏的持续运行,直到某个条件触发退出。
-
检测碰撞:
if self.player.collided(self.pipes, self.floor): return
- 使用
self.player.collided(self.pipes, self.floor)
检查玩家是否与管道或地面发生碰撞。如果发生碰撞,返回并结束游戏,跳出循环。
- 使用
-
更新得分:
for i, pipe in enumerate(self.pipes.upper): if self.player.crossed(pipe): self.score.add()
- 遍历上方的管道(
self.pipes.upper
),检查玩家是否成功跨越了这些管道。 self.player.crossed(pipe)
:如果玩家跨越了管道,调用self.score.add()
增加分数。
- 遍历上方的管道(
-
处理事件:
for event in pygame.event.get(): self.check_quit_event(event) if self.is_tap_event(event): self.player.flap()
- 使用
pygame.event.get()
获取当前事件队列。 self.check_quit_event(event)
:检查是否有退出事件(如关闭窗口)。self.is_tap_event(event)
:检查是否检测到点击事件,如果是,就调用self.player.flap()
使玩家角色进行“拍击”动作,通常用于让角色向上移动。
- 使用
-
更新游戏状态:
self.background.tick() self.floor.tick() self.pipes.tick() self.score.tick() self.player.tick()
- 调用各个组件的
tick()
方法,更新它们的状态。这可能包括动画更新、位置变化等。
- 调用各个组件的
-
更新显示:
pygame.display.update()
- 更新游戏显示,渲染所有的游戏组件到屏幕上。
-
异步等待:
await asyncio.sleep(0)
- 这个调用使得游戏在每次循环结束后异步等待,允许其他协程运行。这对于确保游戏的流畅性和响应性非常重要。
-
配置更新:
self.config.tick()
- 调用配置的
tick()
方法,可能用于更新游戏配置或状态。
- 调用配置的
各个游戏组件对象都继承自entity, 在tick方法里加点日志
def tick(self) -> None:
self.draw()
rect = self.rect
if self.config.debug:
pygame.draw.rect(self.config.screen, (255, 0, 0), rect, 1)
# write x and y at top of rect
font = pygame.font.SysFont("Arial", 13, True)
entity_position_info = {
'x': f'{self.x:.1f}',
'y': f'{self.y:.1f}',
'w': f'{self.w:.1f}',
'h': f'{self.h:.1f}'
}
if self.get_type() == 'Pipe':
print('Pipe', entity_position_info)
if self.get_type() == 'Player':
print('Player', entity_position_info)
把管道和笨鸟的特征信息打印下
up.size 2
lower.size 2
-------up pipe-------
Pipe {'x': '84.0', 'y': '-151.0', 'w': '52.0', 'h': '320.0'}
-------low pipe-------
Pipe {'x': '84.0', 'y': '289.0', 'w': '52.0', 'h': '320.0'}
-------up pipe-------
Pipe {'x': '266.0', 'y': '-210.0', 'w': '52.0', 'h': '320.0'}
-------low pipe-------
Pipe {'x': '266.0', 'y': '230.0', 'w': '52.0', 'h': '320.0'}
Player {'x': '57.0', 'y': '386.5', 'w': '34.0', 'h': '24.0'}
这个游戏对人的考验在于抓住管道中间窗口靠近的时机,点击触屏让小鸟飞起来跳过空隙,所以状态空间在我想来,应该只需要关注笨鸟的位置和笨鸟前面的管道缺口位置的坐标
所以环境类和它的状态空间计算可以这么写
import asyncio
import time
import gym
import numpy as np
from gym import spaces
class FlappyEnv(gym.Env):
def __init__(self, flappy_game):
super(FlappyEnv, self).__init__()
# 我只关注笨鸟和它前面的管道口坐标的位置
self.state_dim = 9
self.flappy_game = flappy_game
# 定义动作空间(0: 不跳,1: 跳)
self.action_space = spaces.Discrete(1)
# 定义状态空间,可以根据你的游戏状态定义
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(self.state_dim,), dtype=np.float32)
# 定义局编号
self.current_game_num = 0
# 定义需要跳过的pip
self.up_pipe, self.low_pipe = None, None
def reset(self):
# 重置游戏状态
import threading
thread = threading.Thread(target=self.start_flappy_game)
thread.start()
self.current_game_num == self.flappy_game.game_num
return self.get_state()
def start_flappy_game(self):
asyncio.run(self.flappy_game.start())
def step(self, action):
# 执行动作
threshold = 0.5
if action > threshold and hasattr(self.flappy_game, 'player'): # 跳
self.flappy_game.player.flap()
time.sleep(0.5)
# 获取新的状态、奖励和是否结束
state, up_pipe, low_pipe = self.get_state()
reward = self.get_reward()
done = self.is_game_over()
if done:
reward -= 10
else:
if self.up_pipe and self.low_pipe:
if self.up_pipe.number != up_pipe.number:
reward += 20
self.up_pipe, self.low_pipe = up_pipe, low_pipe
else:
self.up_pipe, self.low_pipe = up_pipe, low_pipe
# 如果飞的位置 超过up 了要扣分
# print('state', state) state [ 57 208 84 -215 84 225]
if state[5] > state[1] > state[3]:
reward += 3
if self.current_game_num < self.flappy_game.game_num:
self.current_game_num = self.flappy_game.game_num
reward -= 10
return state, reward, done, {}
def get_state(self):
# 找到笨鸟前面的管道up和low各一个
up_pipe, low_pipe = self.find_closest_pipes()
# 返回当前状态的表示
player = self.get_play()
return np.array([
(player.x + player.w) if player else 0,
player.y if player else 0,
(player.y + player.h)if player else 0,
up_pipe.x if up_pipe else 0,
(up_pipe.x + up_pipe.w) if up_pipe else 0,
(up_pipe.y + up_pipe.h) if up_pipe else 0,
low_pipe.x if low_pipe else 0,
(low_pipe.x + low_pipe.w)if low_pipe else 0,
low_pipe.y if low_pipe else 0
]), up_pipe, low_pipe
def get_play(self):
if hasattr(self.flappy_game, 'player'):
return self.flappy_game.player
else:
return None
def find_closest_pipes(self):
closest_pipes = None
min_diff = float('inf') # 初始化最低差值为正无穷
# 遍历上下管道
if hasattr(self.flappy_game, 'pipes'):
upper_pipes = self.flappy_game.pipes.upper
lower_pipes = self.flappy_game.pipes.lower
for i in range(len(upper_pipes)):
up_pipe = upper_pipes[i]
low_pipe = lower_pipes[i]
if (up_pipe.x + up_pipe.w) > self.flappy_game.player.x and \
(low_pipe.x + low_pipe.w) > self.flappy_game.player.x:
# 计算上管道和下管道的 x 坐标差值
diff = abs(up_pipe.x - low_pipe.x)
if diff < min_diff:
min_diff = diff
closest_pipes = (up_pipe, low_pipe)
return closest_pipes # 返回找到的最接近的管道组
else:
return None, None
def get_reward(self):
# 定义奖励机制
if hasattr(self.flappy_game, 'player') and hasattr(self.flappy_game, 'pipes') \
and hasattr(self.flappy_game, 'floor'):
if self.flappy_game.player.collided(self.flappy_game.pipes, self.flappy_game.floor):
return -1 # 碰撞时惩罚
return 2
else:
return 0
def is_game_over(self):
if hasattr(self.flappy_game, 'player') and hasattr(self.flappy_game, 'pipes'):
return self.flappy_game.player.collided(self.flappy_game.pipes, self.flappy_game.floor)
else:
return False
状态空间的想法是把rect 的接触面都算到状态空间内,即笨鸟向前方向的rect的两各顶点坐标,上下管道在缺口处的各两个点的坐标,由于笨鸟顶点和x坐标不变,上下管道的y左边不变,状态空间可以简化为9个维度
[
(player.x + player.w) if player else 0,
player.y if player else 0,
(player.y + player.h)if player else 0,
up_pipe.x if up_pipe else 0,
(up_pipe.x + up_pipe.w) if up_pipe else 0,
(up_pipe.y + up_pipe.h) if up_pipe else 0,
low_pipe.x if low_pipe else 0,
(low_pipe.x + low_pipe.w)if low_pipe else 0,
low_pipe.y if low_pipe else 0
]
奖励函数按照向前没有碰撞进行加分
训练过程在获取next_state的时候睡眠若干毫秒,再取下一刻的状态和计算reward
在Flappy 游戏端做了如果game over就重新开始的操作,所以step方法里我加了如果上一局结束则动作扣分的逻辑,当越过了管道那它前面的管道,最前面的管道发生变化后增加奖励
def step(self, action):
# 执行动作
threshold = 0.5
if action > threshold and hasattr(self.flappy_game, 'player'): # 跳
self.flappy_game.player.flap()
time.sleep(0.5)
# 获取新的状态、奖励和是否结束
state, up_pipe, low_pipe = self.get_state()
reward = self.get_reward()
done = self.is_game_over()
if done:
reward -= 10
else:
if self.up_pipe and self.low_pipe:
if self.up_pipe.number != up_pipe.number:
reward += 20
self.up_pipe, self.low_pipe = up_pipe, low_pipe
else:
self.up_pipe, self.low_pipe = up_pipe, low_pipe
if self.current_game_num < self.flappy_game.game_num:
self.current_game_num = self.flappy_game.game_num
reward -= 10
return state, reward, done, {}
DDPG 的模型代码
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
class Actor(nn.Module):
def __init__(self, state_dim, action_dim):
super(Actor, self).__init__()
self.fc1 = nn.Linear(state_dim, 256)
self.fc2 = nn.Linear(256, 256)
self.fc3 = nn.Linear(256, action_dim)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
return torch.tanh(self.fc3(x))
class Critic(nn.Module):
def __init__(self, state_dim, action_dim):
super(Critic, self).__init__()
self.fc1 = nn.Linear(state_dim + action_dim, 128)
self.fc2 = nn.Linear(128, 64)
self.fc3 = nn.Linear(64, 1)
def forward(self, state, action):
x = torch.relu(self.fc1(torch.cat([state, action], 1)))
x = torch.relu(self.fc2(x))
return self.fc3(x)
class DDPG:
def __init__(self, state_dim, action_dim):
# 检查是否有可用的 GPU
self.device = "cuda" if torch.cuda.is_available() else "cpu"
print('device', self.device)
self.actor = Actor(state_dim, action_dim).to(self.device)
self.critic = Critic(state_dim, action_dim).to(self.device)
self.actor_target = Actor(state_dim, action_dim).to(self.device)
self.critic_target = Critic(state_dim, action_dim).to(self.device)
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-4)
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)
# 复制权重
self.actor_target.load_state_dict(self.actor.state_dict())
self.critic_target.load_state_dict(self.critic.state_dict())
self.replay_buffer = []
self.max_buffer_size = 100000
self.batch_size = 64
self.gamma = 0.99
self.tau = 0.005
def select_action(self, state):
state = torch.FloatTensor(state).to(self.device).unsqueeze(0)
return self.actor(state).detach().cpu().numpy()[0]
def update(self):
if len(self.replay_buffer) < self.batch_size:
return
# 随机选择一批样本
batch = np.random.choice(len(self.replay_buffer), self.batch_size, replace=False)
state, action, reward, next_state, done = zip(*[self.replay_buffer[i] for i in batch])
state = torch.FloatTensor(state).to(self.device)
action = torch.FloatTensor(action).to(self.device)
reward = torch.FloatTensor(reward).unsqueeze(1).to(self.device)
next_state = torch.FloatTensor(next_state).to(self.device)
done = torch.FloatTensor(done).unsqueeze(1).to(self.device)
# 更新 Critic
target_action = self.actor_target(next_state)
target_q = self.critic_target(next_state, target_action)
expected_q = reward + (1 - done) * self.gamma * target_q
critic_loss = nn.MSELoss()(self.critic(state, action), expected_q.detach())
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# 更新 Actor
actor_loss = -self.critic(state, self.actor(state)).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# 更新 Target 网络
for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
def add_experience(self, experience):
if len(self.replay_buffer) >= self.max_buffer_size:
self.replay_buffer.pop(0)
self.replay_buffer.append(experience)
Actor 和 Critic
深度确定性策略梯度(DDPG)算法中,Actor 和 Critic 扮演着不同但互补的角色
Actor 的角色
-
策略学习:Actor 网络负责生成智能体的策略,也就是给定当前状态下选择的动作。它根据当前的状态输出一个动作(在连续动作空间中,通常是一个实数向量)。
-
动作选择:Actor 通过策略网络输出动作,并可能添加探索噪声(如 Ornstein-Uhlenbeck 噪声)来鼓励探索。
-
输入:Actor 接受环境的状态作为输入,并输出一个动作。通常使用非线性激活函数(如 Tanh)来处理输出,以保持动作在特定范围内(例如 [−1,1][-1, 1][−1,1])。
2. Critic 的角色
-
价值评估:Critic 网络负责评估 Actor 所选择的动作的价值。它根据当前状态和动作输入,输出一个 Q 值(即状态-动作值),表示在给定状态下执行某个动作后预期的回报。
-
更新策略:Critic 的输出用于计算损失函数,指导 Actor 网络的更新。通过最大化 Critic 输出的 Q 值,Actor 能够学习到更好的策略。
-
输入:Critic 接受状态和动作的拼接作为输入,并输出一个标量 Q 值。
3. 互动机制
-
Actor-Critic 更新:
- 在每个训练步骤中,Actor 根据当前的状态生成动作,然后 Critic 评估这个动作的价值。
- Critic 计算 Q 值并根据经验更新其参数。
- Actor 利用 Critic 评估的 Q 值来优化其策略,通常是通过反向传播来调整网络参数。
在 DDPG(深度确定性策略梯度)算法中,有许多重要的参数需要设置,其中两个常见的参数是 gamma
和 tau
。下面是这两个参数的详细含义及其在算法中的作用。
1. gamma
(折扣因子)
-
含义:
gamma
是一个介于 0 和 1 之间的浮点数,用于表示未来奖励的重要性。它决定了在计算未来奖励时,当前奖励与未来奖励的权重。 -
作用:
- 折扣未来奖励:在 Q 值的计算中,
gamma
用于折扣未来的奖励。更高的gamma
值意味着智能体会更加重视未来的奖励,而较低的值则意味着更关注当前的奖励。 - 公式:若一个状态的奖励为
r_t
,下一状态的 Q 值为Q(s_{t+1}, a_{t+1})
,则目标 Q 值的计算可以表示为: - 影响:选择较高的
gamma
值可以促使智能体采取更具长远考虑的策略,但也可能导致学习过程变得更加复杂。
- 折扣未来奖励:在 Q 值的计算中,
2. tau
(软更新参数)
-
含义:
tau
是一个较小的浮点数,通常在 0 到 1 之间,用于控制目标网络的更新速度。 -
作用:
- 软更新目标网络:在 DDPG 中,目标网络(
actor_target
和critic_target
)的参数不是直接复制自主网络,而是通过软更新的方式逐渐接近。这有助于提高训练的稳定性。 - 更新公式:目标网络的参数更新通常使用以下公式: 其中
theta_local
是主网络的参数,theta_target
是目标网络的参数。 - 影响:选择较小的
tau
值(如 0.005)意味着目标网络更新缓慢,从而保持训练的稳定性,防止在训练过程中因参数变化过快而造成的不稳定。
- 软更新目标网络:在 DDPG 中,目标网络(
update 方法更新网络:
def update(self):
if len(self.replay_buffer) < self.batch_size:
return
# 随机选择一批样本
batch = np.random.choice(len(self.replay_buffer), self.batch_size, replace=False)
state, action, reward, next_state, done = zip(*[self.replay_buffer[i] for i in batch])
state = torch.FloatTensor(state).to(self.device)
action = torch.FloatTensor(action).to(self.device)
reward = torch.FloatTensor(reward).unsqueeze(1).to(self.device)
next_state = torch.FloatTensor(next_state).to(self.device)
done = torch.FloatTensor(done).unsqueeze(1).to(self.device)
# 更新 Critic
target_action = self.actor_target(next_state)
target_q = self.critic_target(next_state, target_action)
expected_q = reward + (1 - done) * self.gamma * target_q
critic_loss = nn.MSELoss()(self.critic(state, action), expected_q.detach())
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# 更新 Actor
actor_loss = -self.critic(state, self.actor(state)).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# 更新 Target 网络
for target_param, param in zip(self.actor_target.parameters(), self.actor.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
for target_param, param in zip(self.critic_target.parameters(), self.critic.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
- 首先检查重放缓冲区是否足够大以进行采样。
- 随机选择一批经验样本。
- 计算目标 Q 值,更新 Critic 网络的损失,并进行反向传播。
- 更新 Actor 网络,最大化 Critic 输出的 Q 值。
- 使用软更新方法更新目标网络的权重。
add_experience 方法
- 添加经验:将经验元组(状态、动作、奖励、下一个状态、是否结束)添加到重放缓冲区。
- 如果缓冲区已满,则移除最旧的经验。
整个 DDPG 类实现了深度强化学习中的 Actor-Critic 方法,利用两个神经网络来分别表示策略(Actor)和价值(Critic)。通过重放缓冲区存储经验并进行批量学习。
动作空间噪声
def get_ou_noise(self):
# 生成 Ornstein-Uhlenbeck 噪声
x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + self.sigma * np.sqrt(
self.dt) * np.random.randn(self.action_dim)
self.x_prev = x
return x
def select_action(self, state):
state = torch.FloatTensor(state).to(self.device).unsqueeze(0)
action = self.actor(state).detach().cpu().numpy()[0]
# 添加噪声
noise = self.get_ou_noise()
action = action + noise
# 限制动作范围,例如在 [0, 1] 之间
action = np.clip(action, 0, 1)
return action
-
噪声生成:这个方法生成一个 Ornstein-Uhlenbeck 噪声样本,常用于控制任务中的探索,以增加时间相关性。
-
参数:
self.x_prev
: 上一时刻的噪声值,初始化为零,保持噪声的状态。self.mu
: 噪声的均值,通常设为零向量(np.zeros(self.action_dim)
)。self.theta
: 控制噪声回归到均值的速度,值越大,回归越快。self.sigma
: 噪声的标准差,控制噪声的幅度。self.dt
: 时间步长,通常设为一个小值(例如1e-2
)。
计算新的噪声值 x
,公式如下:
-
其中:
- 第一项是上一时刻的噪声值。
- 第二项是回归部分,确保噪声向均值
mu
回归。 - 第三项是随机噪声项,使用标准正态分布生成。
-
更新状态:将当前的噪声值
x
赋值给self.x_prev
,以便在下一次调用时使用。 -
返回值:返回生成的噪声
x
。
加入DDPG训练代码
def train(load_model=False):
env = FlappyEnv(flappy_game=Flappy())
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n
# 输入状态空间和动作空间维度
ddpg = DDPG(state_dim, action_dim)
# 加载模型
save_model_path = 'ddpg_model.pth'
if load_model and os.path.exists(save_model_path):
ddpg.load(save_model_path)
print("Loaded model from", save_model_path)
num_episodes = 10000
state, _, _ = env.reset()
for episode in range(num_episodes):
# 计算总的奖励值
total_reward = 0
for num_step in range(1000):
action_values = ddpg.select_action(state)
next_state, reward, done, _ = env.step(action_values[0])
# 增加到经验区
ddpg.add_experience((state, action_values, reward, next_state, float(done)))
ddpg.update()
state = next_state
total_reward += reward
print(f'Episode {episode}, num_step {num_step}, '
f'action: {action_values}, '
f'state: {next_state}, '
f'current Reward: {reward}')
if done:
break
# 每epoch保存一次模型
if episode % 100 == 0:
ddpg.save(save_model_path)
print(f"Episode {episode}, total_step: {num_step}, saved model to", save_model_path)
运行训练,训练日志输出
new game begin, game_num: 37
Episode 5, num_step 13, action: [0.54722494], state: [ 91 236 260 439 491 153 439 491 273], current Reward: 2
Episode 5, num_step 14, action: [0.56834648], state: [ 91 199 223 359 411 153 359 411 273], current Reward: 2
Episode 5, num_step 15, action: [0.60419988], state: [ 91 170 194 284 336 153 284 336 273], current Reward: 2
Episode 5, num_step 16, action: [0.59114947], state: [ 91 145 169 204 256 153 204 256 273], current Reward: 2
Episode 5, num_step 17, action: [0.59423856], state: [ 91. 116. 140. 311. 363. 130. 311. 363. 250.], current Reward: 12
new game begin, game_num: 38
Episode 5, num_step 18, action: [0.56833228], state: [ 91 209 233 409 461 164 409 461 284], current Reward: 2
Episode 5, num_step 19, action: [0.57073671], state: [ 91 179 203 334 386 164 334 386 284], current Reward: 2
Episode 5, num_step 20, action: [0.56179526], state: [ 91 155 179 254 306 164 254 306 284], current Reward: 2
Episode 5, num_step 21, action: [0.54514305], state: [ 91 125 149 179 231 164 179 231 284], current Reward: 2
Episode 5, num_step 22, action: [0.54333423], state: [ 91 101 125 99 151 164 99 151 284], current Reward: 2
基本符合动作和奖惩的关系,让训练跑起来
在训练过程里发现始终没有拿到跨过管道的奖励,一直没有迈过第一层管道,似乎陷入了局部最优
调试了下发现是奖励函数在一个地方没有控制好,我本意是想判断如果跨过了管道就给奖励,结果环境里的逻辑一旦失败就重启博弈,管道都刷新了也符合这个判断,所以出现一直往管道上撞击,毕竟malkvo 链上得到得结论是那个方向得奖励最大
做了下修改,避免失败了还给高分奖励
if self.up_pipe and self.low_pipe:
if self.up_pipe.number != up_pipe.number \
and self.up_pipe in self.flappy_game.pipes.upper \
and self.low_pipe in self.flappy_game.pipes.lower:
# 此处需要修改bug,重新开始博弈后也会掉进这个逻辑里
reward += 10
self.up_pipe, self.low_pipe = up_pipe, low_pipe
DDPG 里加上模型保存逻辑
def save(self, filename):
torch.save({
'actor_state_dict': self.actor.state_dict(),
'critic_state_dict': self.critic.state_dict(),
'actor_target_state_dict': self.actor_target.state_dict(),
'critic_target_state_dict': self.critic_target.state_dict(),
'actor_optimizer_state_dict': self.actor_optimizer.state_dict(),
'critic_optimizer_state_dict': self.critic_optimizer.state_dict(),
}, filename)
def load(self, filename):
checkpoint = torch.load(filename)
self.actor.load_state_dict(checkpoint['actor_state_dict'])
self.critic.load_state_dict(checkpoint['critic_state_dict'])
self.actor_target.load_state_dict(checkpoint['actor_target_state_dict'])
self.critic_target.load_state_dict(checkpoint['critic_target_state_dict'])
self.actor_optimizer.load_state_dict(checkpoint['actor_optimizer_state_dict'])
self.critic_optimizer.load_state_dict(checkpoint['critic_optimizer_state_dict'])
查了下用Q-Learn的几篇帖子里说训练会几个典型阶段:
迭代5万次,通过管道获取的奖励少,小鸟一直向上飞(直接摆烂… …),几乎一个管道通过不了;
迭代50万次,偶尔可以通过一两个管道;
迭代100万次,可以通过4、5个管道;
迭代150万次,可以通过超过15个管道;
迭代250万次,可以一直通过管道,极少数会失误; 迭代300万次,小鸟一直向前飞… …
训练代码加上每100 epoch保存一次模型,最后训练完成使用matplotlib绘制奖励曲线
# 绘制奖励线图
plt.plot(rewards)
plt.title("Training Rewards Over Time")
plt.xlabel("Episode")
plt.ylabel("Total Reward")
plt.savefig("training_rewards.png", format='png')
后面试验结束后附图
代码提交在github:
https://github.com/chenrui2200/flapbird_ddpg_train
原文地址:https://blog.csdn.net/u011564831/article/details/143820321
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!