锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

SSA麻雀算法-LSTM-优化神经网络神经元个数-dropout-batch_size

时间:2022-08-30 12:00:00 电感式接近传感器ssa

1、摘要

本文主要讲解:使用SSA麻雀算法-LSTM-优化神经网络神经元数量-dropout-batch_size
主要思路:

  1. SSA Parameters :优化函数、粒子数、搜索维度、迭代次数
  2. LSTM Parameters 神经网络第一层神经元数,神经网络第二层神经元数,dropout比率、batch_size
  3. 开始搜索: 发现者(探索者)的位置更新;取出最大适应度和最差适应度X;更新跟随位置;预警值小,说明没有捕食者;预警值大,说明捕食者威胁到种群安全,需要去其他地方觅食;更新加入者(追随者)的位置;
  4. 使用训练模型SSA最好的全球最佳参数
  5. plt.show()

2、数据介绍

时序数据简单
在这里插入图片描述

3、相关技术

麻雀搜索算法(Sparrow Search Algorithm, SSA)是一种新型的群智能优化算法,在2020年提出,主要是受麻雀的觅食行为和反捕食行为的启发

4.完整代码和步骤

主操作程序入口

import os  import matplotlib import matplotlib.pyplot as plt import numpy as np import pandas as pd from sklearn.metrics import mean_squared_error from sklearn.preprocessing import MinMaxScaler from tensorflow.keras.callbacks import EarlyStopping from tensorflow.keras.layers import Dense, Dropout, LSTM from tensorflow.keras.models import Sequential from tensorflow.python.keras.layers import Activation   class SSA():     def __init__(self, func, n_dim=None, pop_size=20, max_iter=50, lb=-512, ub=512, verbose=False):         self.func = func         self.n_dim = n_dim  # dimension of particles, which is the number of variables of func         self.pop = pop_size  # number of particles         P_percent = 0.2  # # 生产者人口规模占总人口规模的20%         D_percent = 0.1  # 预警人口占总人口规模的10%         self.pNum = round(self.pop * P_percent)  # 生产者的人口规模占总人口规模的20%
        self.warn = round(self.pop * D_percent)  # 预警者的人口规模占总人口规模的10%

        self.max_iter = max_iter  # max iter
        self.verbose = verbose  # print the result of each iter or not

        self.lb, self.ub = np.array(lb) * np.ones(self.n_dim), np.array(ub) * np.ones(self.n_dim)
        assert self.n_dim == len(self.lb) == len(self.ub), 'dim == len(lb) == len(ub) is not True'
        assert np.all(self.ub > self.lb), 'upper-bound must be greater than lower-bound'

        self.X = np.random.uniform(low=self.lb, high=self.ub, size=(self.pop, self.n_dim))

        self.Y = [self.func(self.X[i]) for i in range(len(self.X))]  # y = f(x) for all particles
        self.pbest_x = self.X.copy()  # personal best location of every particle in history
        self.pbest_y = [np.inf for i in range(self.pop)]  # best image of every particle in history
        self.gbest_x = self.pbest_x.mean(axis=0).reshape(1, -1)  # global best location for all particles
        self.gbest_y = np.inf  # global best y for all particles
        self.gbest_y_hist = []  # gbest_y of every iteration
        self.update_pbest()
        self.update_gbest()
        #
        # record verbose values
        self.record_mode = False
        self.record_value = { 
        'X': [], 'V': [], 'Y': []}
        self.best_x, self.best_y = self.gbest_x, self.gbest_y  # history reasons, will be deprecated
        self.idx_max = 0
        self.x_max = self.X[self.idx_max, :]
        self.y_max = self.Y[self.idx_max]

    def cal_y(self, start, end):
        # calculate y for every x in X
        for i in range(start, end):
            self.Y[i] = self.func(self.X[i])
        # return self.Y

    def update_pbest(self):
        ''' personal best '''
        for i in range(len(self.Y)):
            if self.pbest_y[i] > self.Y[i]:
                self.pbest_x[i] = self.X[i]
                self.pbest_y[i] = self.Y[i]

    def update_gbest(self):
        idx_min = self.pbest_y.index(min(self.pbest_y))
        if self.gbest_y > self.pbest_y[idx_min]:
            self.gbest_x = self.X[idx_min, :].copy()
            self.gbest_y = self.pbest_y[idx_min]

    def find_worst(self):
        self.idx_max = self.Y.index(max(self.Y))
        self.x_max = self.X[self.idx_max, :]
        self.y_max = self.Y[self.idx_max]

    def update_finder(self):
        r2 = np.random.rand(1)  # 预警值
        self.idx = sorted(enumerate(self.Y), key=lambda x: x[1])
        self.idx = [self.idx[i][0] for i in range(len(self.idx))]
        # 这一部位为发现者(探索者)的位置更新
        if r2 < 0.8:  # 预警值较小,说明没有捕食者出现
            for i in range(self.pNum):
                r1 = np.random.rand(1)
                self.X[self.idx[i], :] = self.X[self.idx[i], :] * np.exp(-(i) / (r1 * self.max_iter))  # 对自变量做一个随机变换
                self.X = np.clip(self.X, self.lb, self.ub)  # 对超过边界的变量进行去除
                # X[idx[i], :] = Bounds(X[idx[i], :], lb, ub) # 对超过边界的变量进行去除
                # fit[sortIndex[0, i], 0] = func(X[sortIndex[0, i], :]) # 算新的适应度值
        elif r2 >= 0.8:  # 预警值较大,说明有捕食者出现威胁到了种群的安全,需要去其它地方觅食
            for i in range(self.pNum):
                Q = np.random.rand(1)  # 也可以替换成 np.random.normal(loc=0, scale=1.0, size=1)
                self.X[self.idx[i], :] = self.X[self.idx[i], :] + Q * np.ones(
                    (1, self.n_dim))  # Q是服从正态分布的随机数。L表示一个1×d的矩阵
                self.X = np.clip(self.X, self.lb, self.ub)  # 对超过边界的变量进行去除
                # X[idx[i], :] = Bounds(X[sortIndex[0, i], :], lb, ub)
                # fit[sortIndex[0, i], 0] = func(X[sortIndex[0, i], :])
        self.cal_y(0, self.pNum)

    def update_follower(self):
        # 这一部位为加入者(追随者)的位置更新
        for ii in range(self.pop - self.pNum):
            i = ii + self.pNum
            A = np.floor(np.random.rand(1, self.n_dim) * 2) * 2 - 1
            best_idx = self.Y[0:self.pNum].index(min(self.Y[0:self.pNum]))
            bestXX = self.X[best_idx, :]
            if i > self.pop / 2:
                Q = np.random.rand(1)
                self.X[self.idx[i], :] = Q * np.exp((self.x_max - self.X[self.idx[i], :]) / np.square(i))
            else:
                self.X[self.idx[i], :] = bestXX + np.dot(np.abs(self.X[self.idx[i], :] - bestXX),
                                                         1 / (A.T * np.dot(A, A.T))) * np.ones((1, self.n_dim))
        self.X = np.clip(self.X, self.lb, self.ub)  # 对超过边界的变量进行去除
        # X[self.idx[i],:] = Bounds(X[self.idx[i],lb,ub)
        # fit[self.idx[i],0] = func(X[self.idx[i], :])
        self.cal_y(self.pNum, self.pop)

    def detect(self):
        arrc = np.arange(self.pop)
        c = np.random.permutation(arrc)  # 随机排列序列
        b = [self.idx[i] for i in c[0: self.warn]]
        e = 10e-10
        for j in range(len(b)):
            if self.Y[b[j]] > self.gbest_y:
                self.X[b[j], :] = self.gbest_y + np.random.rand(1, self.n_dim) * np.abs(self.X[b[j], :] - self.gbest_y)
            else:
                self.X[b[j], :] = self.X[b[j], :] + (2 * np.random.rand(1) - 1) * np.abs(
                    self.X[b[j], :] - self.x_max) / (self.func(self.X[b[j]]) - self.y_max + e)
            # X[sortIndex[0, b[j]], :] = Bounds(X[sortIndex[0, b[j]], :], lb, ub)
            # fit[sortIndex[0, b[j]], 0] = func(X[sortIndex[0, b[j]]])
            self.X = np.clip(self.X, self.lb, self.ub)  # 对超过边界的变量进行去除
            self.Y[b[j]] = self.func(self.X[b[j]])

    def run(self, max_iter=None):
        self.max_iter = max_iter or self.max_iter
        for iter_num in range(self.max_iter):
            self.update_finder()  # 更新发现者位置
            self.find_worst()  # 取出最大的适应度值和最差适应度的X
            self.update_follower()  # 更新跟随着位置
            self.update_pbest()
            self.update_gbest()
            self.detect()
            self.update_pbest()
            self.update_gbest()
            self.gbest_y_hist.append(self.gbest_y)
        return self.best_x, self.best_y


np.random.seed(666)
matplotlib.rcParams['agg.path.chunksize'] = 0
matplotlib.rcParams.update(matplotlib.rc_params())

src = 'D:\项目\PSO-LSTM\数据2\\'
src1 = 'D:\项目\PSO-LSTM\数据2\\'
os.chdir(r'D:\项目\PSO-LSTM\数据2')
filename = 'lstm4_pso_'

batch_size = 128
epochs = 2
steps = 10
scalerx = MinMaxScaler(feature_range=(0, 1))
scalery = MinMaxScaler(feature_range=(0, 1))


def process_data():
    # usecols 代表使用数据的列索引,左闭右开
    dataset = pd.read_csv("huilv.csv")
    dataset['date'] = dataset['Datetime'].map(lambda date: date.split('/')[2])
    dataset['USD/CNY'] = scalerx.fit_transform(dataset['USD/CNY'].values.reshape(-1, 1))
    dataset['date'] = scalerx.fit_transform(dataset['date'].values.reshape(-1, 1))
    # 对Y进行标准化
    dataset['USD/CNY'] = scalery.fit_transform(dataset['USD/CNY'].values.reshape(-1, 1))
    X = dataset[['date', 'USD/CNY']]
    y = dataset['USD/CNY']
    # test_size代表划分20%到测试集
    X_train = X.iloc[:228, :]
    y_train = y.iloc[:228]
    X_test = X.iloc[228:, :]
    y_test = y.iloc[228:]
    return X_train, y_train, X_test, y_test


def create_dataset(X, y, seq_len):
    features = []
    targets = []  # 标签

    for i in range(0, len(X) - seq_len, 1):  # 此处的1表示步长,每隔一步滑一下
        data = X.iloc[i:i + seq_len]  # 序列数据;前闭后开
        label = y.iloc[i + seq_len]  # 标签数据
        # 保存到features和labels
        features.append(data)
        targets.append(label)
    trainX = np.array(features).astype('float64')
    return trainX, np.array(targets).reshape(-1, 1)


def build_model(neurons1, neurons2, dropout):
    X_train, y_train, X_test, y_test = process_data()
    X_train, y_train = create_dataset(X_train, y_train, steps)
    X_test, y_test = create_dataset(X_test, y_test, steps)
    nb_features = X_train.shape[2]
    input1 = X_train.shape[1]
    model1 = Sequential()
    model1.add(LSTM(
        input_shape=(input1, nb_features),
        units=neurons1,
        return_sequences=True))
    model1.add(Dropout(dropout))

    model1.add(LSTM(
        units=neurons2,
        return_sequences=False))
    model1.add(Dropout(dropout))

    model1.add(Dense(units=1))
    model1.add(Activation("linear"))
    model1.compile(loss='mse', optimizer='Adam', metrics='mae')
    return model1, X_train, y_train, X_test, y_test


def training(X):
    neurons1 = int(X[0])
    neurons2 = int(X[1])
    dropout = round(X[2], 6)
    batch_size = int(X[3])
    print(X)
    model, X_train, y_train, X_test, y_test = build_model(neurons1, neurons2, dropout)
    model.fit(
        X_train,
        y_train,
        batch_size=batch_size,
        epochs=1,
        validation_split=0.1,
        verbose=1,
        callbacks=[EarlyStopping(monitor='val_loss', patience=22, restore_best_weights=True)])

    pred = model.predict(X_test)
    temp_mse = mean_squared_error(y_test 
锐单商城拥有海量元器件数据手册IC替代型号,打造电子元器件IC百科大全!

相关文章