锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

321235

时间:2022-11-12 18:00:01 335三极管468三极管426三极管

转载:my_name_txt
原文链接:https://blog.csdn.net/qq_39575818/article/details/90514825

基于噪声信号检测的电子设备识别

1. 基本思路

众所周知,当电子设备工作时,由于热电流电阻三极管会向外发出噪声信号。通过手接触,信号通过人体,然后到设备天线,设备检测这些信号,可以通过一些机器学习方法识别什么电子设备,相当于每个电子设备都有一个指纹。
该技术最大的特点是,它不需要被识别的物体发出信号,只需要识别传统设备,并具有只有电器的一些特性。它可能是物联网发展过程中的一个可行的解决方案。我希望你能继续开发这项技术。
本项目检测的信号频率范围为0-100M Hz,以后的研究可以考虑在其他频率上进行检测,也希望大家能给予理论上的支持,为什么低中频噪声电磁信号有利于识别。

2. 硬件改造

参考论文:
https://www.gierad.com/assets/emsense/emsense.pdf
硬件的主要设备是一种 RTL2832U SDR receiver, SDR是指 self defined radio,它可以理解为芯片上的集成usb口把信号传到电脑或者嵌入式设备的收音机。
我们主要用的是SDR模数转换模块和读取信号的芯片。短接放大混频冲击等模块。详见参考文献。

噪声信号
人体
天线
低通滤波
数模转换
pc读取

下面是具体细节

2.1 天线改造

天线和手腕接触,本项目中改装了收音机原来的天线,由于绕磁天线本身就是一个高通滤波器,所以把它剪短了很多,去头去尾,留取中间一小段。
天线和手接触的部分,采取四层结构:
金属层1—— 绝缘层——金属层2——绝缘层
金属层1 连接原来天线的信号线,金属层2连接原来的天线的地线。

2.2 芯片改造


將进入数模转化前的两个电容去除,使得芯片的其他部分短路。使用
跳线的方式,焊接上一个巴伦转换器WB1010。
巴伦转换器的实质是一个低通滤波器, datasheet: https://www.mouser.cn/datasheet/2/597/wbt-463414.pdf
可见同一边的两个管脚是等价的,C1和C2可以互换,信号口和地可以互换。
巴伦转化器标记+V的管脚连接天线的信号线,标记GND的管脚连接天线的地线。

3. 软件支持

根据以下网站获得软件支持: https://github.com/roger-/pyrtlsdr

  • 下载以上网站中的代码,运行 setup.py
  • 下载动态库 https://github.com/librtlsdr/librtlsdr/releases
  • 按照 https://github.com/roger-/pyrtlsdr 的trouble shooting,编辑/home/USER_NAME/.bashrc,添加上动态库的路径

有了硬件和软件之后,剩下的做什么就可以自由发挥了!

4. class txtSDR

我写了一个类,用于一些SDR的基本行为。

    def getOneSample(self):
        lockSDR.acquire()
        samples = self.sdr.read_samples(NUM_SAMPLES_PER_SCAN)
        psd_scan, f = psd(samples, NFFT=NFFT)
        aa=np.zeros((3))
        psd_scan[2047:2050]=aa
        self.psd_scan = psd_scan
        lockSDR.release()

   
     
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

读取一次数据,由于实际情况中存在较大的直流分量,且变化比较大,对预测造成巨大干扰。所以,通过附零的方式,手动去除直流分量。

  1. 下采样,使用scipy 包的 signal对象。
    def DownSample(self, S, i):
        return signal.resample(S, 512)

   
     
        
  • 1
  • 2
  1. 读取十个数据,取平均作为背景噪声。
    def getBackGroundNois(self):
        printMy("Getting background noise profile!")
        self.back_ground = 0
        self.getOneSample()
        psd_scan = self.getPsd_scan()
        for x in range(10):
            self.getOneSample()
            psd_scan = self.getPsd_scan()
            self.back_ground += psd_scan
        self.back_ground /= 10
        printMy("Got background noise profile!")

   
     
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  1. 有了每次的读取数据后,手动提取一些特征,组合成一个向量
    def getFeatures(self):
        psd_scan = self.psd_scan - self.back_ground
        std = np.std(psd_scan, ddof = 1)
        S1=psd_scan - (self.back_ground + 2.5 * std)
        maxS = np.maximum(0, S1)
        S = self.DownSample(S1, 512)
        diff_1 = S[1:] - S[0:-1]
        diff_2 = diff_1[1:] - diff_1[0:-1]
        rms = np.sqrt(np.mean(S**2))
        S_list = S.tolist()
        max_index = S_list.index(max(S_list))
        #printMy("max_index", max_index)
        min_index = S_list.index(min(S_list))
        COM = np.dot(S, np.linspace(0,1,512)) / np.sum(S)# center of mass
        std = np.std(S)
        kurtosis = stats.kurtosis(S)
        Crest = maxS / np.sum(S)
        self.features = np.hstack((Crest,kurtosis,min_index,max_index,COM,std,diff_1,diff_2,rms)).reshape(1,-1).flatten()
        #self.features = np.hstack((Crest,kurtosis,min_index,max_index,COM,std,rms)).reshape(1,-1).flatten()
        #printMy(Crest,kurtosis,min_index,max_index,COM,std,diff_1,diff_2,rms)

   
     
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  1. 采集新的一个数据,读取连续20个数据,把特征和tag 保存在本地’data/'文件夹中。tag 是指目前的第几个物体。
    def addNewItem(self):
        self.tag=self.tag+1
        np.save("tag",self.tag)
    time.sleep(0.5)

    for x in range(1,20):
        # printMy(str(x)+"...")
        self.getOneSample()
        self.getFeatures()
        path='data/'+str(time.time())
        dataToSave=np.hstack(([self.tag],self.features))
        np.save(path,dataToSave)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  1. 读取所有的数据,把第一位作为y,第一位以后作为x,训练模型,并保存到本地。
    def train(self):
    if self.tag < 2:
        return
    printMy('save model...')
    path='data/'
    fileList = os.listdir(path)
    y = []
    x = []

    for f in fileList:
        f = path + f
        dataN = np.load(f)
        #printMy("dataN[0]:"+str(dataN[0]))
        y.append(dataN[0].tolist())
        x.append(dataN[1:].tolist())         
    from sklearn import svm
    from sklearn.externals import joblib as jl
    clf = svm.SVC()
    clf.fit(np.array(x),np.array(y))
    Y = clf.predict(np.array(x))
    printMy("done!\naccurency on training sets:"+str(np.sum(Y==y)/len(y)))
    jl.dump(clf,'clf.model')        
    pass
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  1. 进行预测,使用已有模型。
    def Predict(self, features):
    if os.path.exists('clf.model'):
        from sklearn.externals import joblib as jl
        model=jl.load('clf.model')
        answer=model.predict(features)
    else:
        answer=-1
    return answer
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  1. 删除一则数据和清空数据。
    def clear(self):
        path='data/'
        fileList = os.listdir(path)
        for f in fileList:
            f = path + f
            os.remove(f) 
    if os.path.exists('clf.model'):     
        os.remove('clf.model')
    np.save('tag',0)
    self.tag = 0

def deleteOneItem(self, a):
    path='data/'
    fileList = os.listdir(path)
    flag = 0
    for f in fileList:
        f = path + f
        dataN = np.load(f)
        if dataN[0] == a:
            os.remove(f) 
            flag = 1
        
        elif dataN[0] > a:
            #print(dataN[0])
            dataN[0] -= 1
            np.save(f,dataN)

    if flag == 1:
        print("tag:"+str(self.tag))
        self.tag -= 1
        np.save("tag",self.tag)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

5. class Ui_MainWindow

GUI 窗口,主要是pyqt 界面按钮的设计以及按钮事件对SDR的方法的调用。
最关键的是

  • pyqt 的gui是必须要在主线程中运行的,不能通过其他线程直接更改主界面的内容,而是要通过一些特殊的方法。更新指示界面:https://www.cnblogs.com/anloveslife/p/7541898.html
  • 读取新的设备、训练、预测都要使用一段时间,要使用新的线程去做这些事情,不然的话,主线程会去处理这些事情,主界面的图像和指示都不会更新,时间长的话会出现假死机的情况——界面变灰色。多线程教程:https://blog.csdn.net/zhanh1218/article/details/32131385
  • matplotlib 的图像需要先放在一个“画板“ 上,再放到gui界面的一个模块中: http://whuhan2013.github.io/blog/2017/03/28/pyqt-matplotlib-learn
  • 运用一些pyqt控件的函数以及css美化

6、检测效果

各个电子产品的噪声波形
演示视频:https://pan.baidu.com/s/133g-HFulpZLWsblDXQmKqw 密码:vc3x
由于身高差,帮我那点脑的刀某同学无法入镜,对此,我表示十分遗憾以及深切慰问。

7. 代码总汇

myGui.py:


from PyQt5 import QtCore,QtGui,QtWidgets
import sys
import qtawesome
from PyQt5.QtWidgets import (QApplication, QWidget,QPushButton,
    QLineEdit,QHBoxLayout, QVBoxLayout,QColorDialog,QInputDialog,QFileDialog)
from PyQt5.QtCore import Qt
from PyQt5.QtGui import QCursor,QColor
import matplotlib
import matplotlib.animation as animation
from matplotlib.mlab import psd
import pylab as pyl
import numpy as np
import sys
from rtlsdr import RtlSdr
from scipy import signal
from scipy import stats
import os
import time
import argparse
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
from PyQt5 import QtWidgets,QtCore
from PyQt5.QtWidgets import QMainWindow,QApplication
import sys
from collections import Counter
import threading
showflag = True
lockSDR = threading.Lock()
lockPrint = threading.Lock()

NFFT = 10244
NUM_SAMPLES_PER_SCAN = NFFT16
NUM_BUFFERED_SWEEPS = 100
NUM_SCANS_PER_SWEEP = 1
FREQ_INC_COARSE = 1e6
FREQ_INC_FINE = 0.1e6
GAIN_INC = 5

printBuffer = ‘’

def printMy(OneString):
#print(OneString)
global printBuffer
temp = getPrintBuffer()
temp = printBuffer + ‘\n’ + str(OneString)
lockPrint.acquire()
printBuffer = temp
lockPrint.release()
#print(printBuffer)

def getPrintBuffer():
lockPrint.acquire()
temp = printBuffer
lockPrint.release()
return temp

class txtSdr(object):
def init(self, sdr=None, fig=None):
self.sdr = sdr if sdr else RtlSdr()
self.numOfSample = 0
self.back_ground = np.zeros([4096,])
self.mod = 0
if os.path.exists(“tag.npy”):
self.tag = np.load(“tag.npy”)
else:
self.tag = 0

def DownSample(self, S, i):
    return signal.resample(S, 512)

def getFeatures(self):
    psd_scan = self.psd_scan - self.back_ground
    std = np.std(psd_scan, ddof = 1)
    S1=psd_scan - (self.back_ground + 2.5 * std)
    maxS = np.maximum(0, S1)

    S = self.DownSample(S1, 512)
    diff_1 = S[1:] - S[0:-1]
    diff_2 = diff_1[1:] - diff_1[0:-1]
    rms = np.sqrt(np.mean(S**2))
    S_list = S.tolist()
    max_index = S_list.index(max(S_list))
    #printMy("max_index", max_index)
    min_index = S_list.index(min(S_list))
    COM = np.dot(S, np.linspace(0,1,512)) / np.sum(S)# center of mass
    std = np.std(S)
    kurtosis = stats.kurtosis(S)
    Crest = maxS / np.sum(S)
    self.features = np.hstack((Crest,kurtosis,min_index,max_index,COM,std,diff_1,diff_2,rms)).reshape(1,-1).flatten()
    #self.features = np.hstack((Crest,kurtosis,min_index,max_index,COM,std,rms)).reshape(1,-1).flatten()
    #printMy(Crest,kurtosis,min_index,max_index,COM,std,diff_1,diff_2,rms)

def Predict(self, features):
    
    if os.path.exists('clf.model'):
        from sklearn.externals import joblib as jl
        model=jl.load('clf.model')
        answer=model.predict(features)
    else:
        answer=-1
    return answer

def train(self):

    if self.tag < 2:
        return
    printMy 

相关文章