SSA麻雀算法-LSTM-优化神经网络神经元个数-dropout-batch_size
时间:2022-08-30 12:00:00
1、摘要
本文主要讲解:使用SSA麻雀算法-LSTM-优化神经网络神经元数量-dropout-batch_size
主要思路:
- SSA Parameters :优化函数、粒子数、搜索维度、迭代次数
- LSTM Parameters 神经网络第一层神经元数,神经网络第二层神经元数,dropout比率、batch_size
- 开始搜索: 发现者(探索者)的位置更新;取出最大适应度和最差适应度X;更新跟随位置;预警值小,说明没有捕食者;预警值大,说明捕食者威胁到种群安全,需要去其他地方觅食;更新加入者(追随者)的位置;
- 使用训练模型SSA最好的全球最佳参数
- plt.show()
2、数据介绍
时序数据简单
3、相关技术
麻雀搜索算法(Sparrow Search Algorithm, SSA)是一种新型的群智能优化算法,在2020年提出,主要是受麻雀的觅食行为和反捕食行为的启发 。
4.完整代码和步骤
主操作程序入口
import os import matplotlib import matplotlib.pyplot as plt import numpy as np import pandas as pd from sklearn.metrics import mean_squared_error from sklearn.preprocessing import MinMaxScaler from tensorflow.keras.callbacks import EarlyStopping from tensorflow.keras.layers import Dense, Dropout, LSTM from tensorflow.keras.models import Sequential from tensorflow.python.keras.layers import Activation class SSA(): def __init__(self, func, n_dim=None, pop_size=20, max_iter=50, lb=-512, ub=512, verbose=False): self.func = func self.n_dim = n_dim # dimension of particles, which is the number of variables of func self.pop = pop_size # number of particles P_percent = 0.2 # # 生产者人口规模占总人口规模的20% D_percent = 0.1 # 预警人口占总人口规模的10% self.pNum = round(self.pop * P_percent) # 生产者的人口规模占总人口规模的20% self.warn = round(self.pop * D_percent) # 预警者的人口规模占总人口规模的10% self.max_iter = max_iter # max iter self.verbose = verbose # print the result of each iter or not self.lb, self.ub = np.array(lb) * np.ones(self.n_dim), np.array(ub) * np.ones(self.n_dim) assert self.n_dim == len(self.lb) == len(self.ub), 'dim == len(lb) == len(ub) is not True' assert np.all(self.ub > self.lb), 'upper-bound must be greater than lower-bound' self.X = np.random.uniform(low=self.lb, high=self.ub, size=(self.pop, self.n_dim)) self.Y = [self.func(self.X[i]) for i in range(len(self.X))] # y = f(x) for all particles self.pbest_x = self.X.copy() # personal best location of every particle in history self.pbest_y = [np.inf for i in range(self.pop)] # best image of every particle in history self.gbest_x = self.pbest_x.mean(axis=0).reshape(1, -1) # global best location for all particles self.gbest_y = np.inf # global best y for all particles self.gbest_y_hist = [] # gbest_y of every iteration self.update_pbest() self.update_gbest() # # record verbose values self.record_mode = False self.record_value = { 'X': [], 'V': [], 'Y': []} self.best_x, self.best_y = self.gbest_x, self.gbest_y # history reasons, will be deprecated self.idx_max = 0 self.x_max = self.X[self.idx_max, :] self.y_max = self.Y[self.idx_max] def cal_y(self, start, end): # calculate y for every x in X for i in range(start, end): self.Y[i] = self.func(self.X[i]) # return self.Y def update_pbest(self): ''' personal best ''' for i in range(len(self.Y)): if self.pbest_y[i] > self.Y[i]: self.pbest_x[i] = self.X[i] self.pbest_y[i] = self.Y[i] def update_gbest(self): idx_min = self.pbest_y.index(min(self.pbest_y)) if self.gbest_y > self.pbest_y[idx_min]: self.gbest_x = self.X[idx_min, :].copy() self.gbest_y = self.pbest_y[idx_min] def find_worst(self): self.idx_max = self.Y.index(max(self.Y)) self.x_max = self.X[self.idx_max, :] self.y_max = self.Y[self.idx_max] def update_finder(self): r2 = np.random.rand(1) # 预警值 self.idx = sorted(enumerate(self.Y), key=lambda x: x[1]) self.idx = [self.idx[i][0] for i in range(len(self.idx))] # 这一部位为发现者(探索者)的位置更新 if r2 < 0.8: # 预警值较小,说明没有捕食者出现 for i in range(self.pNum): r1 = np.random.rand(1) self.X[self.idx[i], :] = self.X[self.idx[i], :] * np.exp(-(i) / (r1 * self.max_iter)) # 对自变量做一个随机变换 self.X = np.clip(self.X, self.lb, self.ub) # 对超过边界的变量进行去除 # X[idx[i], :] = Bounds(X[idx[i], :], lb, ub) # 对超过边界的变量进行去除 # fit[sortIndex[0, i], 0] = func(X[sortIndex[0, i], :]) # 算新的适应度值 elif r2 >= 0.8: # 预警值较大,说明有捕食者出现威胁到了种群的安全,需要去其它地方觅食 for i in range(self.pNum): Q = np.random.rand(1) # 也可以替换成 np.random.normal(loc=0, scale=1.0, size=1) self.X[self.idx[i], :] = self.X[self.idx[i], :] + Q * np.ones( (1, self.n_dim)) # Q是服从正态分布的随机数。L表示一个1×d的矩阵 self.X = np.clip(self.X, self.lb, self.ub) # 对超过边界的变量进行去除 # X[idx[i], :] = Bounds(X[sortIndex[0, i], :], lb, ub) # fit[sortIndex[0, i], 0] = func(X[sortIndex[0, i], :]) self.cal_y(0, self.pNum) def update_follower(self): # 这一部位为加入者(追随者)的位置更新 for ii in range(self.pop - self.pNum): i = ii + self.pNum A = np.floor(np.random.rand(1, self.n_dim) * 2) * 2 - 1 best_idx = self.Y[0:self.pNum].index(min(self.Y[0:self.pNum])) bestXX = self.X[best_idx, :] if i > self.pop / 2: Q = np.random.rand(1) self.X[self.idx[i], :] = Q * np.exp((self.x_max - self.X[self.idx[i], :]) / np.square(i)) else: self.X[self.idx[i], :] = bestXX + np.dot(np.abs(self.X[self.idx[i], :] - bestXX), 1 / (A.T * np.dot(A, A.T))) * np.ones((1, self.n_dim)) self.X = np.clip(self.X, self.lb, self.ub) # 对超过边界的变量进行去除 # X[self.idx[i],:] = Bounds(X[self.idx[i],lb,ub) # fit[self.idx[i],0] = func(X[self.idx[i], :]) self.cal_y(self.pNum, self.pop) def detect(self): arrc = np.arange(self.pop) c = np.random.permutation(arrc) # 随机排列序列 b = [self.idx[i] for i in c[0: self.warn]] e = 10e-10 for j in range(len(b)): if self.Y[b[j]] > self.gbest_y: self.X[b[j], :] = self.gbest_y + np.random.rand(1, self.n_dim) * np.abs(self.X[b[j], :] - self.gbest_y) else: self.X[b[j], :] = self.X[b[j], :] + (2 * np.random.rand(1) - 1) * np.abs( self.X[b[j], :] - self.x_max) / (self.func(self.X[b[j]]) - self.y_max + e) # X[sortIndex[0, b[j]], :] = Bounds(X[sortIndex[0, b[j]], :], lb, ub) # fit[sortIndex[0, b[j]], 0] = func(X[sortIndex[0, b[j]]]) self.X = np.clip(self.X, self.lb, self.ub) # 对超过边界的变量进行去除 self.Y[b[j]] = self.func(self.X[b[j]]) def run(self, max_iter=None): self.max_iter = max_iter or self.max_iter for iter_num in range(self.max_iter): self.update_finder() # 更新发现者位置 self.find_worst() # 取出最大的适应度值和最差适应度的X self.update_follower() # 更新跟随着位置 self.update_pbest() self.update_gbest() self.detect() self.update_pbest() self.update_gbest() self.gbest_y_hist.append(self.gbest_y) return self.best_x, self.best_y np.random.seed(666) matplotlib.rcParams['agg.path.chunksize'] = 0 matplotlib.rcParams.update(matplotlib.rc_params()) src = 'D:\项目\PSO-LSTM\数据2\\' src1 = 'D:\项目\PSO-LSTM\数据2\\' os.chdir(r'D:\项目\PSO-LSTM\数据2') filename = 'lstm4_pso_' batch_size = 128 epochs = 2 steps = 10 scalerx = MinMaxScaler(feature_range=(0, 1)) scalery = MinMaxScaler(feature_range=(0, 1)) def process_data(): # usecols 代表使用数据的列索引,左闭右开 dataset = pd.read_csv("huilv.csv") dataset['date'] = dataset['Datetime'].map(lambda date: date.split('/')[2]) dataset['USD/CNY'] = scalerx.fit_transform(dataset['USD/CNY'].values.reshape(-1, 1)) dataset['date'] = scalerx.fit_transform(dataset['date'].values.reshape(-1, 1)) # 对Y进行标准化 dataset['USD/CNY'] = scalery.fit_transform(dataset['USD/CNY'].values.reshape(-1, 1)) X = dataset[['date', 'USD/CNY']] y = dataset['USD/CNY'] # test_size代表划分20%到测试集 X_train = X.iloc[:228, :] y_train = y.iloc[:228] X_test = X.iloc[228:, :] y_test = y.iloc[228:] return X_train, y_train, X_test, y_test def create_dataset(X, y, seq_len): features = [] targets = [] #
标签 for i in range(0, len(X) - seq_len, 1): # 此处的1表示步长,每隔一步滑一下 data = X.iloc[i:i + seq_len] # 序列数据;前闭后开 label = y.iloc[i + seq_len] # 标签数据 # 保存到features和labels features.append(data) targets.append(label) trainX = np.array(features).astype('float64') return trainX, np.array(targets).reshape(-1, 1) def build_model(neurons1, neurons2, dropout): X_train, y_train, X_test, y_test = process_data() X_train, y_train = create_dataset(X_train, y_train, steps) X_test, y_test = create_dataset(X_test, y_test, steps) nb_features = X_train.shape[2] input1 = X_train.shape[1] model1 = Sequential() model1.add(LSTM( input_shape=(input1, nb_features), units=neurons1, return_sequences=True)) model1.add(Dropout(dropout)) model1.add(LSTM( units=neurons2, return_sequences=False)) model1.add(Dropout(dropout)) model1.add(Dense(units=1)) model1.add(Activation("linear")) model1.compile(loss='mse', optimizer='Adam', metrics='mae') return model1, X_train, y_train, X_test, y_test def training(X): neurons1 = int(X[0]) neurons2 = int(X[1]) dropout = round(X[2], 6) batch_size = int(X[3]) print(X) model, X_train, y_train, X_test, y_test = build_model(neurons1, neurons2, dropout) model.fit( X_train, y_train, batch_size=batch_size, epochs=1, validation_split=0.1, verbose=1, callbacks=[EarlyStopping(monitor='val_loss', patience=22, restore_best_weights=True)]) pred = model.predict(X_test) temp_mse = mean_squared_error(y_test