sdnet
from __future__ import absolute_import from __future__ import division from __future__ import print_function import math import numpy as np import torch import torch.nn as nn import torch._utils import torch.nn.functional as F import torch.nn.init as init import torch.optim as optim from Lib.config import config import random import scipy.io as scio from torch.utils.data import TensorDataset, DataLoader import csv import matplotlib.pyplot as plt # 定义一个3x3卷积!kernel_initializer='he_normal','glorot_normal' def regularized_padded_conv(in_channels, out_channels, kernel_size, stride=1): conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding=kernel_size // 2, bias=False) # 使用 kaiming_normal_ 进行初始化 nn.init.kaiming_normal_(conv.weight, mode='fan_out', nonlinearity='leaky_relu') return conv ####################### 通道注意力机制 ########################## class ChannelAttention(nn.Module): def __init__(self, in_planes, ratio=16): super(ChannelAttention, self).__init__() self.avg_pool = nn.AdaptiveAvgPool2d((1, 1)) self.max_pool = nn.AdaptiveMaxPool2d((1, 1)) compressed_channels = in_planes // ratio self.conv1 = nn.Conv2d(in_planes, compressed_channels, kernel_size=1, stride=1, padding=0) self.conv2 = nn.Conv2d(compressed_channels, in_planes, kernel_size=1, stride=1, padding=0) self.leaky_relu = nn.LeakyReLU(negative_slope=0.1, inplace=True) def forward(self, inputs): avg = self.avg_pool(inputs) max = self.max_pool(inputs) avg = self.conv2(self.leaky_relu(self.conv1(avg))) max = self.conv2(self.leaky_relu(self.conv1(max))) out = avg + max out = torch.sigmoid(out) return out ########################### 空间注意力机制 ########################### class SpatialAttention(nn.Module): def __init__(self, kernel_size=7): super(SpatialAttention, self).__init__() self.conv1 = regularized_padded_conv(2, 1, kernel_size, stride=1) self.sigmoid = nn.Sigmoid() self.leaky_relu = nn.LeakyReLU(negative_slope=0.1, inplace=True) def forward(self, inputs): avg_out = torch.mean(inputs, dim=1, keepdim=True) max_out, _ = torch.max(inputs, dim=1, keepdim=True) out = torch.cat([avg_out, max_out], dim=1) out = self.conv1(out) out = self.sigmoid(out) return out ####################################csc layer########################################################### class elasnet_prox(nn.Module): r"""Applies the elastic net proximal operator, NOTS: it will degenerate to ell1_prox if mu=0.0 The elastic net proximal operator function is given as the following function \argmin_{x} \lambda ||x||_1 + \mu /2 ||x||_2^2 + 0.5 ||x - input||_2^2 Args: lambd: the :math:`\lambda` value on the ell_1 penalty term. Default: 0.5 mu: the :math:`\mu` value on the ell_2 penalty term. Default: 0.0 Shape: - Input: :math:`(N, *)` where `*` means, any number of additional dimensions - Output: :math:`(N, *)`, same shape as the input """ def __init__(self, lambd=0.5, mu=0.0): super(elasnet_prox, self).__init__() self.lambd = lambd self.scaling_mu = 1.0 / (1.0 + mu) def forward(self, input): return F.softshrink(input * self.scaling_mu, self.lambd * self.scaling_mu) def extra_repr(self): return '{} {}'.format(self.lambd, self.scaling_mu) class DictBlock(nn.Module): # c = argmin_c lmbd * ||c||_1 + mu/2 * ||c||_2^2 + 1 / 2 * ||x - weight (@conv) c||_2^2 def __init__(self, n_channel, dict_size, mu=0.0, lmbd=0.0, n_dict=1, non_negative=True, stride=1, kernel_size=3, padding=1, share_weight=True, square_noise=True, n_steps=10, step_size_fixed=True, step_size=0.1, w_norm=True, padding_mode="constant"): super(DictBlock, self).__init__() self.mu = mu self.lmbd = lmbd # LAMBDA self.n_dict = n_dict self.stride = stride self.kernel_size = (kernel_size, kernel_size) self.padding = padding self.padding_mode = padding_mode assert self.padding_mode in ['constant', 'reflect', 'replicate', 'circular'] self.groups = 1 self.n_steps = n_steps self.conv_transpose_output_padding = 0 if stride == 1 else 1 self.w_norm = w_norm self.non_negative = non_negative self.v_max = None self.v_max_error = 0. self.xsize = None self.zsize = None self.lmbd_ = None self.square_noise = square_noise self.weight = nn.Parameter(torch.Tensor(dict_size, self.n_dict * n_channel, kernel_size, kernel_size)) with torch.no_grad(): init.kaiming_uniform_(self.weight) self.nonlinear = elasnet_prox(self.lmbd * step_size, self.mu * step_size) self.register_buffer('step_size', torch.tensor(step_size, dtype=torch.float)) def fista_forward(self, x): for i in range(self.n_steps): weight = self.weight step_size = self.step_size if i == 0: c_pre = 0. c = step_size * F.conv2d(x.repeat(1, self.n_dict, 1, 1), weight, bias=None, stride=self.stride, padding=self.padding) c = self.nonlinear(c) elif i == 1: c_pre = c xp = F.conv_transpose2d(c, weight, bias=None, stride=self.stride, padding=self.padding, output_padding=self.conv_transpose_output_padding) r = x.repeat(1, self.n_dict, 1, 1) - xp if self.square_noise: gra = F.conv2d(r, weight, bias=None, stride=self.stride, padding=self.padding) else: w = r.view(r.size(0), -1) normw = w.norm(p=2, dim=1, keepdim=True).clamp_min(1e-12).expand_as(w).detach() w = (w / normw).view(r.size()) gra = F.conv2d(w, weight, bias=None, stride=self.stride, padding=self.padding) * 0.5 c = c + step_size * gra c = self.nonlinear(c) t = (math.sqrt(5.0) + 1.0) / 2.0 else: t_pre = t t = (math.sqrt(1.0 + 4.0 * t_pre * t_pre) + 1) / 2.0 a = (t_pre + t - 1.0) / t * c + (1.0 - t_pre) / t * c_pre c_pre = c xp = F.conv_transpose2d(c, weight, bias=None, stride=self.stride, padding=self.padding, output_padding=self.conv_transpose_output_padding) r = x.repeat(1, self.n_dict, 1, 1) - xp if self.square_noise: gra = F.conv2d(r, weight, bias=None, stride=self.stride, padding=self.padding) else: w = r.view(r.size(0), -1) normw = w.norm(p=2, dim=1, keepdim=True).clamp_min(1e-12).expand_as(w).detach() w = (w / normw).view(r.size()) gra = F.conv2d(w, weight, bias=None, stride=self.stride, padding=self.padding) * 0.5 c = a + step_size * gra c = self.nonlinear(c) if self.non_negative: c = F.leaky_relu(c, negative_slope=0.1) return c, weight def forward(self, x): if self.xsize is None: self.xsize = (x.size(-3), x.size(-2), x.size(-1)) print(self.xsize) else: assert self.xsize[-3] == x.size(-3) and self.xsize[-2] == x.size(-2) and self.xsize[-1] == x.size(-1) if self.w_norm: self.normalize_weight() c, weight = self.fista_forward(x) # Compute loss xp = F.conv_transpose2d(c, weight, bias=None, stride=self.stride, padding=self.padding, output_padding=self.conv_transpose_output_padding) r = x.repeat(1, self.n_dict, 1, 1) - xp r_loss = torch.sum(torch.pow(r, 2)) / self.n_dict c_loss = self.lmbd * torch.sum(torch.abs(c)) + self.mu / 2. * torch.sum(torch.pow(c, 2)) if self.zsize is None: self.zsize = (c.size(-3), c.size(-2), c.size(-1)) print(self.zsize) else: assert self.zsize[-3] == c.size(-3) and self.zsize[-2] == c.size(-2) and self.zsize[-1] == c.size(-1) if self.lmbd_ is None and config.MODEL.ADAPTIVELAMBDA: self.lmbd_ = self.lmbd * self.xsize[-3] * self.xsize[-2] * self.xsize[-1] / ( self.zsize[-3] * self.zsize[-2] * self.zsize[-1]) self.lmbd = self.lmbd_ print("======") print("xsize", self.xsize) print("zsize", self.zsize) print("new lmbd: ", self.lmbd) return c, (r_loss, c_loss) def update_stepsize(self): step_size = 0.9 / self.power_iteration(self.weight) self.step_size = self.step_size * 0. + step_size self.nonlinear.lambd = self.lmbd * step_size self.nonlinear.scaling_mu = 1.0 / (1.0 + self.mu * step_size) def normalize_weight(self): with torch.no_grad(): w = self.weight.view(self.weight.size(0), -1) normw = w.norm(p=2, dim=1, keepdim=True).clamp_min(1e-12).expand_as(w) w = (w / normw).view(self.weight.size()) self.weight.data = w.data def power_iteration(self, weight): max_iteration = 50 v_max_error = 1.0e5 tol = 1.0e-5 k = 0 with torch.no_grad(): if self.v_max is None: c = weight.shape[0] v = torch.randn(size=(1, c, self.zsize[-2], self.zsize[-1])).to(weight.device) else: v = self.v_max.clone() while k < max_iteration and v_max_error > tol: tmp = F.conv_transpose2d( v, weight, bias=None, stride=self.stride, padding=self.padding, output_padding=self.conv_transpose_output_padding ) v_ = F.conv2d(tmp, weight, bias=None, stride=self.stride, padding=self.padding) v_ = F.normalize(v_.view(-1), dim=0, p=2).view(v.size()) v_max_error = torch.sum((v_ - v) ** 2) k += 1 v = v_ v_max = v.clone() Dv_max = F.conv_transpose2d( v_max, weight, bias=None, stride=self.stride, padding=self.padding, output_padding=self.conv_transpose_output_padding ) # Dv lambda_max = torch.sum(Dv_max ** 2).item() # vTDTDv / vTv, ignore the vTv since vTv = 1 self.v_max = v_max return lambda_max ################################# SDNet ################################################################ from Lib.config import config as _cfg cfg = _cfg class DictConv2d(nn.Module): def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, bias=True): super(DictConv2d, self).__init__() self.dn = DictBlock( in_channels, out_channels, stride=stride, kernel_size=kernel_size, padding=padding, mu=cfg['MODEL']['MU'], lmbd=cfg['MODEL']['LAMBDA'][0], square_noise=cfg['MODEL']['SQUARE_NOISE'], n_dict=cfg['MODEL']['EXPANSION_FACTOR'], non_negative=cfg['MODEL']['NONEGATIVE'], n_steps=cfg['MODEL']['NUM_LAYERS'], w_norm=cfg['MODEL']['WNORM'] ) self.rc = None self.r_loss = [] def get_rc(self): if self.rc is None: raise ValueError("should call forward first.") else: return self.rc def forward(self, x): out, rc = self.dn(x) self.rc = rc if self.training is False: self.r_loss.extend([self.rc[0].item() / len(x)] * len(x)) return out #########模型构建############### class SDNet_model(nn.Module): def __init__(self, dropout1, dropout2, num_classes=2): super(SDNet_model, self).__init__() # self.layer0 = nn.Sequential( # DictConv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False), # nn.BatchNorm2d(64), # nn.ReLU(inplace=True), # ) self.conv0 = nn.Conv2d(1, 64, kernel_size=(3, 3), padding=(1, 1)) self.bn0 = nn.BatchNorm2d(64) self.pool0 = nn.MaxPool2d(kernel_size=(2, 2)) self.conv1 = nn.Conv2d(64, 128, kernel_size=(3, 3), padding=(1, 1)) self.bn1 = nn.BatchNorm2d(128) self.pool1 = nn.MaxPool2d(kernel_size=(2, 2)) self.dropout1 = nn.Dropout2d(p=dropout1) self.layer0 = nn.Sequential( DictConv2d(128, 256, kernel_size=3, stride=1, padding=1, bias=False), nn.BatchNorm2d(256), nn.LeakyReLU(inplace=True), ) self.conv2 = nn.Conv2d(256, 256, kernel_size=(3, 3), padding=(1, 1)) self.bn2 = nn.BatchNorm2d(256) self.ca = ChannelAttention(256) self.sa = SpatialAttention() self.conv3 = nn.Conv2d(256, 256, kernel_size=(3, 3), padding=(1, 1)) self.pool2 = nn.MaxPool2d(kernel_size=(2, 2)) self.dropout2 = nn.Dropout2d(p=dropout2) self.flatten = nn.Flatten() self.fc1 = nn.Linear(256 * 12 * 75, 512) self.fc2 = nn.Linear(512, 256) self.fc3 = nn.Linear(256, num_classes) self.leaky_relu = nn.LeakyReLU(negative_slope=0.1, inplace=True) self.sigmoid = nn.Sigmoid() def update_stepsize(self): for m in self.modules(): if isinstance(m, DictBlock): m.update_stepsize() def get_rc(self): rc_list = [] for m in self.modules(): if isinstance(m, DictConv2d): rc_list.append(m.get_rc()) return rc_list def forward(self, x): # x = self.layer0(x) x = self.conv0(x) x = self.bn0(x) x = self.pool0(x) x = self.conv1(x) x = self.bn1(x) x = self.pool1(x) x = self.dropout1(x) x = self.layer0(x) x = self.conv2(x) x = self.bn2(x) x = self.ca(x) * x x = self.sa(x) * x x = self.conv3(x) x = self.pool2(x) # print(x.shape) x = self.dropout2(x) x = self.flatten(x) # print(x.shape) x = self.leaky_relu(self.fc1(x)) x = self.fc2(x) x = self.leaky_relu(x) x = self.fc3(x) x = self.sigmoid(x) return x def SDCNN_model(num_classes, dropout1, dropout2): model = SDNet_model(num_classes=num_classes, dropout1=dropout1, dropout2=dropout2) return model randomSeed = 1 random.seed(randomSeed) torch.manual_seed(randomSeed) np.random.seed(randomSeed) def main(): # 数据导入 dataFile = r'C:\Users\sun\Desktop\SDNET\SDNet-main\data\python_energy_T.mat' data = scio.loadmat(dataFile) train_input = data['train_input'] train_output = data['train_output'] test_input = data['test_input'] test_output = data['test_output'] validate_input = data['validate_input'] validate_output = data['validate_output'] train_input = train_input.reshape(-1, 1, 100, 300).astype('float32') test_input = test_input.reshape(-1, 1, 100, 300).astype('float32') validate_input = validate_input.reshape(-1, 1, 100, 300).astype('float32') train_input = torch.from_numpy(train_input) train_output = torch.from_numpy(train_output) validate_input = torch.from_numpy(validate_input) validate_output = torch.from_numpy(validate_output) test_input = torch.from_numpy(test_input) test_output = torch.from_numpy(test_output) # 定义超参数搜索空间 epochs = range(50, 201) batch_sizes = [64, 128, 256] dropouts1 = [0.1, 0.3, 0.5] dropouts2 = [0.1, 0.3, 0.5] # 初始化最优超参数和最高准确度 best_hyperparams = {'epoch': None, 'batch_size': None, 'dropout1': None, 'dropout2': None} best_accuracy = 0.0 # 定义随机搜索算法的迭代次数 num_iterations = 10 # 随机搜索算法 for i in range(num_iterations): # 随机选择超参数组合 epoch = random.choice(epochs) batch_size = random.choice(batch_sizes) dropout1 = random.choice(dropouts1) dropout2 = random.choice(dropouts2) print(f"Iteration {i+1}/{num_iterations}: epoch={epoch}, batch_size={batch_size}, dropout1={dropout1}, dropout2={dropout2}") # 实例化模型、损失函数和优化器 model = SDCNN_model(num_classes=2, dropout1=dropout1, dropout2=dropout2) criterion = nn.BCELoss() optimizer = optim.Adam(model.parameters(), lr=0.001) # 将数据转换为PyTorch DataLoader train_dataset = TensorDataset(train_input, torch.tensor(train_output).float()) valid_dataset = TensorDataset(validate_input, torch.tensor(validate_output).float()) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False) # 实例化学习率调度器 #diff 添加学习率调度器 scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1) # 训练模型 for e in range(epoch): model.train() for inputs, targets in train_loader: inputs, targets = inputs, targets optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, targets) loss.backward() optimizer.step() scheduler.step() # 评估模型 model.eval() correct = 0 total = 0 with torch.no_grad(): for inputs, targets in valid_loader: inputs, targets = inputs, targets outputs = model(inputs) predicted = torch.argmax(outputs, dim=1) total += targets.size(0) targets_index = torch.argmax(targets, dim=1) correct += (predicted == targets_index).sum().item() accuracy = 100 * correct / total print(f"Iteration {i+1}: Accuracy={accuracy:.2f}%") # 更新最优超参数和最高准确度 if accuracy > best_accuracy: best_hyperparams['epoch'] = epoch best_hyperparams['batch_size'] = batch_size best_hyperparams['dropout1'] = dropout1 best_hyperparams['dropout2'] = dropout2 best_accuracy = accuracy print(f"New best accuracy: {best_accuracy:.2f}% with hyperparameters {best_hyperparams}") # 使用找到的最佳超参数进行最终训练 best_epoch = best_hyperparams['epoch'] best_batch_size = best_hyperparams['batch_size'] best_dropout1 = best_hyperparams['dropout1'] best_dropout2 = best_hyperparams['dropout2'] def weights_init(m): if isinstance(m, (nn.Conv2d, nn.Linear)): nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='leaky_relu') if m.bias is not None: nn.init.constant_(m.bias, 0) # 重新实例化模型以确保权重是新的 model = SDCNN_model(num_classes=2, dropout1=best_dropout1, dropout2=best_dropout2) model.apply(weights_init) optimizer = optim.Adam(model.parameters(), lr=0.001) # 使用最佳批量大小创建数据加载器 train_loader = DataLoader(train_dataset, batch_size=best_batch_size, shuffle=True) valid_loader = DataLoader(valid_dataset, batch_size=best_batch_size, shuffle=False) # 实例化学习率调度器 #diff 添加学习率调度器 scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1) # 特征可视化准备 feature_maps = {} def get_activation(name): def hook(model, input, output): feature_maps[name] = output.detach() return hook # 注册钩子 #diff 注册前向钩子以提取特征图 for name, layer in model.named_modules(): if isinstance(layer, nn.Conv2d) or isinstance(layer, DictConv2d): layer.register_forward_hook(get_activation(name)) # 训练模型 for e in range(best_epoch): model.train() running_loss = 0.0 for inputs, targets in train_loader: inputs, targets = inputs, targets optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs.squeeze(), targets.squeeze()) loss.backward() optimizer.step() running_loss += loss.item() # 累加损失以计算平均损失 scheduler.step() print(f'Epoch {e + 1}/{best_epoch}, Loss: {running_loss / len(train_loader):.4f}') # 评估模型 model.eval() # 设置模型为评估模式 validation_loss = 0.0 with torch.no_grad(): for inputs, targets in valid_loader: inputs, targets = inputs, targets outputs = model(inputs) validation_loss += criterion(outputs.squeeze(), targets.squeeze()).item() print(f'Validation Loss: {validation_loss / len(valid_loader):.4f}') model.eval() with torch.no_grad(): sample_inputs = validate_input[:1] model(sample_inputs) def visualize_features(feature_maps, layer_names, num_images=5): for layer_name in layer_names: act = feature_maps.get(layer_name) if act is None: continue act = act.cpu().numpy() num_channels = act.shape[1] plt.figure(figsize=(20, 10)) for i in range(min(num_channels, 64)): plt.subplot(8, 8, i + 1) plt.imshow(act[0, i, :, :], cmap='viridis') plt.axis('off') plt.suptitle(f'Feature Maps of {layer_name}') plt.savefig(f'feature_maps_{layer_name}.png') plt.close() layers_to_visualize = ['conv0', 'conv1', 'DictConv2d', 'conv2', 'conv3'] visualize_features(feature_maps, layers_to_visualize) model.eval() with torch.no_grad(): predictions = model(test_input.float()) probabilities = predictions predicted_labels = torch.argmax(probabilities, dim=1) predict = predicted_labels.cpu().numpy() print(predict) with open(r'C:\Users\sun\Desktop\SDNET\SDNet-main\predict_label.csv', 'w', newline='') as pr_file: writer = csv.writer(pr_file) for label in predict: writer.writerow([label]) with open(r'C:\Users\sun\Desktop\SDNET\SDNet-main\pr.csv', 'w+') as pr_file: out = [f"{i[0]},{i[1]}" for i in probabilities] pr_file.write("\n".join(out)) # 调用函数保存预测结果 # save_predictions_to_csv(probabilities.cpu().numpy(), 'pr.csv') def save_model_complete(model, filename=r'C:\Users\sun\Desktop\SDNET\SDNet-main\sdnet_model.pth'): torch.save(model.state_dict(), filename) print(f"Complete model saved as {filename}") save_model_complete(model) if __name__ == '__main__': main()
原文地址:https://blog.csdn.net/yyfhq/article/details/143483728
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!