深度学习算法informer(时序预测)(六)(数据处理、自注意力、自适应调节学习率、早停法)
一、数据处理
假设选取25h数据,即前24个时间点数据作为编码器输入,特征值只有温度,解码器的输入与编码器的输入不同,将编码器的输入倒数12个时间点数据拼接上第25个时间点数据作为解码器的输入
注意
- 若进行反归一化,则编码器倒数12个时间点数据需进行归一化操作后和未经过归一化操作的第25个时间点数据进行拼接
- 若不进行反归一化,则编码器最后12个时间点数据和第25个时间点数据均需进行归一化操作后进行拼接
class StandardScaler():
def __init__(self):
self.mean = 0.
self.std = 1.
def fit(self, data):
self.mean = data.mean(0)
self.std = data.std(0)
def transform(self, data):
mean = torch.from_numpy(self.mean).type_as(data).to(data.device) if torch.is_tensor(data) else self.mean
std = torch.from_numpy(self.std).type_as(data).to(data.device) if torch.is_tensor(data) else self.std
return (data - mean) / std
def inverse_transform(self, data):
mean = torch.from_numpy(self.mean).type_as(data).to(data.device) if torch.is_tensor(data) else self.mean
std = torch.from_numpy(self.std).type_as(data).to(data.device) if torch.is_tensor(data) else self.std
if data.shape[-1] != mean.shape[-1]:
mean = mean[-1:]
std = std[-1:]
return (data * std) + mean
class Dataset_ETT_hour(Dataset):
def __init__(self, root_path, flag='train', size=None,
features='S', data_path='ETTh1.csv',
target='OT', scale=True, inverse=False, timeenc=0, freq='h', cols=None):
# size [seq_len, label_len, pred_len]
# info
if size == None:
self.seq_len = 24*4*4
self.label_len = 24*4
self.pred_len = 24*4
else:
self.seq_len = size[0]
self.label_len = size[1]
self.pred_len = size[2]
# init
assert flag in ['train', 'test', 'val']
type_map = {'train':0, 'val':1, 'test':2}
self.set_type = type_map[flag]
self.features = features
self.target = target
self.scale = scale
self.inverse = inverse
self.timeenc = timeenc
self.freq = freq
self.root_path = root_path
self.data_path = data_path
self.__read_data__()
def __read_data__(self):
self.scaler = StandardScaler()
df_raw = pd.read_csv(os.path.join(self.root_path,
self.data_path))
#little
# df_raw = df_raw[df_raw['date'] < '2016-10-01']
border1s = [0, 12*30*24 - self.seq_len, 12*30*24+4*30*24 - self.seq_len]
border2s = [12*30*24, 12*30*24+4*30*24, 12*30*24+8*30*24]
# little_sample
# border1s = [0, 2 * 30 * 24 - self.seq_len, 2 * 30 * 24 + 0.5 * 30 * 24 - self.seq_len]
# border2s = [2 * 30 * 24, 2 * 30 * 24 + 0.5 * 30 * 24, 2 * 30 * 24 + 1 * 30 * 24]
border1 = int(border1s[self.set_type])
border2 = int(border2s[self.set_type])
## 训练集标准化,全部数据用到的是训练数据的参数均值方差
if self.features=='M' or self.features=='MS':
cols_data = df_raw.columns[1:]
df_data = df_raw[cols_data]
elif self.features=='S':
df_data = df_raw[[self.target]]
if self.scale:
train_data = df_data[border1s[0]:border2s[0]]
self.scaler.fit(train_data.values)
data = self.scaler.transform(df_data.values)
else:
data = df_data.values
df_stamp = df_raw[['date']][border1:border2]
df_stamp['date'] = pd.to_datetime(df_stamp.date)
data_stamp = time_features(df_stamp, timeenc=self.timeenc, freq=self.freq)
# hour of day / day of wheek / day of month / day of year
self.data_x = data[border1:border2]
if self.inverse:
self.data_y = df_data.values[border1:border2]
else:
self.data_y = data[border1:border2]
self.data_stamp = data_stamp
def __getitem__(self, index):
s_begin = index
s_end = s_begin + self.seq_len
r_begin = s_end - self.label_len
r_end = r_begin + self.label_len + self.pred_len
seq_x = self.data_x[s_begin:s_end]
# 若反归一化,label_length采用归一化的数据,pred_length采用未归一化数据,将其拼接作为解码器输入
if self.inverse:
seq_y = np.concatenate([self.data_x[r_begin:r_begin+self.label_len], self.data_y[r_begin+self.label_len:r_end]], 0)
else:
seq_y = self.data_y[r_begin:r_end]
seq_x_mark = self.data_stamp[s_begin:s_end]
seq_y_mark = self.data_stamp[r_begin:r_end]
return seq_x, seq_y, seq_x_mark, seq_y_mark
def __len__(self):
# print('data_x', len(self.data_x))
# print('seq_len', self.seq_len)
# print('pred_len', self.pred_len)
return len(self.data_x) - self.seq_len - self.pred_len + 1
def inverse_transform(self, data):
return self.scaler.inverse_transform(data)
二、自注意力网上原理太多,这里不再讲述原理,只做代码展示
class FullAttention(nn.Module):
def __init__(self, mask_flag=True, factor=5, scale=None, attention_dropout=0.1, output_attention=False):
super(FullAttention, self).__init__()
self.scale = scale
self.mask_flag = mask_flag
self.output_attention = output_attention
self.dropout = nn.Dropout(attention_dropout)
def forward(self, queries, keys, values, attn_mask):
B, L, H, E = queries.shape
_, S, _, D = values.shape
scale = self.scale or 1./sqrt(E)
scores = torch.einsum("blhe,bshe->bhls", queries, keys)
if self.mask_flag:
if attn_mask is None:
attn_mask = TriangularCausalMask(B, L, device=queries.device)
scores.masked_fill_(attn_mask.mask, -np.inf)
A = self.dropout(torch.softmax(scale * scores, dim=-1))
V = torch.einsum("bhls,bshd->blhd", A, values)
if self.output_attention:
return (V.contiguous(), A)
else:
return (V.contiguous(), None)
三、自适应调节学习率
这个函数通过两种策略调整学习率:
- 类型1:每个epoch减少一半的学习率
- 类型2:在特定的epoch,将学习率设置为预定义的值
在每个epoch结束时,根据当前的epoch和设定的调整策略,更新优化器的学习率
def adjust_learning_rate(optimizer, epoch, args):
# lr = args.learning_rate * (0.2 ** (epoch // 2))
if args.lradj=='type1':
lr_adjust = {epoch: args.learning_rate * (0.5 ** ((epoch-1) // 1))}
elif args.lradj=='type2':
# 如果args.lradj等于'type2',则学习率在特定的epoch设置为预定义的值
# 在指定的epoch,学习率会调整为相应的值
# 例如,在第2个epoch学习率变为5e-5,第4个epoch学习率变为1e-5,依此类推
lr_adjust = {
2: 5e-5, 4: 1e-5, 6: 5e-6, 8: 1e-6,
10: 5e-7, 15: 1e-7, 20: 5e-8
}
# 检查当前的epoch是否在lr_adjust字典的键中
# 如果是,则将学习率设置为字典中对应的值
# 更新优化器中每个参数组的学习率
# 打印学习率更新的信息
if epoch in lr_adjust.keys():
lr = lr_adjust[epoch]
for param_group in optimizer.param_groups:
param_group['lr'] = lr
print('Updating learning rate to {}'.format(lr))
四、早停法
目的是在验证损失不再显著改善时提前停止训练,从而防止过拟合
它会在每个epoch结束后调用,检查当前的验证损失是否改善
- 如果改善,则保存模型并重置计数器
- 如果没有改善,则增加计数器,直到计数器超过设定patience,然后停止训练
class EarlyStopping:
"""
EarlyStopping类的目的是在验证损失不再显著改善时提前停止训练,从而防止过拟合
它会在每个epoch结束后调用,检查当前的验证损失是否改善
如果改善,则保存模型并重置计数器;
如果没有改善,则增加计数器,直到计数器超过设定的patience,然后停止训练
patience: 表示当验证损失不再改善时,允许的最大次数。如果超过这个次数,训练就会停止
verbose: 决定是否在验证损失改善时打印信息
counter: 记录验证损失未改善的次数
best_score: 记录最好的验证损失得分
early_stop: 表示是否应该提前停止训练
val_loss_min: 记录最小的验证损失
delta: 表示验证损失需要改善的最小变化量
"""
def __init__(self, patience=7, verbose=False, delta=0):
self.patience = patience
self.verbose = verbose
self.counter = 0
self.best_score = None
self.early_stop = False
self.val_loss_min = np.Inf
self.delta = delta
def __call__(self, val_loss, model, path):
score = -val_loss
if self.best_score is None:
self.best_score = score
self.save_checkpoint(val_loss, model, path)
elif score < self.best_score + self.delta:
self.counter += 1
print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
if self.counter >= self.patience:
self.early_stop = True
else:
self.best_score = score
self.save_checkpoint(val_loss, model, path)
self.counter = 0
def save_checkpoint(self, val_loss, model, path):
if self.verbose:
print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...')
torch.save(model.state_dict(), path+'/'+'checkpoint.pth')
self.val_loss_min = val_loss
原文地址:https://blog.csdn.net/weixin_43336108/article/details/139860283
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!