锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

ESP8266 ds1208温度传感器 并通过MQTT发送到平台

时间:2023-05-29 09:07:00 cnt温度传感器

##先联网

importsocket,time,re

frommachineimportPin,I2C

importnetwork

importujson

frommachineimportTimer

importos

importssd1306

importsocket

fromconfigWifiimport*

importurequests

fromsimpleimportMQTTClient

importonewire,ds18x20

fromds18x20importDS18X20

html="""

login

Login



登录

"""

defprocess_request(request):

job=None

try:

data=request.recv(1024)#读取接受的字节"HTTP/1.1200OK\r\n"HTTP/1.1200OK\r\n\r\n

nbsp;   data_str = data.decode()

        print (data_str)

        

        if data_str.split()[0] == 'POST':

            print("POST request")

            #data_str.replace('username====================','username:');

            #data_str.replace('username====================','password:');

            mat = re.search("{\"username\":.+?,\"password\".+\S}",data_str)

            print (mat)

            if mat != None:

                print (mat.group(0))

                #解析用户名和密码

                data_str = mat.group(0)

                job = ujson.loads(data_str)

                print('username',job['username'])

                print('password',job['password'])

                return job


 

        else:

            print("not POST request")

        request.sendall(bytes("HTTP/1.1 200 OK\r\n\r\n", "utf-8"))#返回一个服务器数据格式

        request.sendall(bytes(html, "utf-8"))

        return job

 

    except Exception as e:

        print(e)

        return None



 

    

 

def webConfigWifi():

    #开启AP模式

    ap = network.WLAN(network.AP_IF) # create access-point interface

    ap.active(True)         # activate the interface

    time.sleep(1)

    ap.ifconfig(('192.168.0.1', '255.255.255.0', '192.168.0.2', '8.8.8.8'))

    #配置为开放式热点

    ap.config(essid='ESP-AP',authmode=0) # set the ESSID of the access point

    time.sleep(1)

    try:

        sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)#创建一个服务器socket监听

        sock.bind(('192.168.0.1',80))

        sock.listen(5)#设置最大监听数

        while True:

            connection,address = sock.accept()#接受浏览器请求

            print('client connected from', address)

            js_ip_psw = process_request(connection)

            connection.close()

            if js_ip_psw!=None:

              global wlan

              wlan=connectWiFi(js_ip_psw['username'],js_ip_psw['password'])

              if wlan !=None:

                print("config success")

                file = open('config.txt','w')

                file.write("{'name':'%s','password':'%s'}"%(js_ip_psw['username'],js_ip_psw['password']))

                file.close()

                ap.active(False)

                return True

                break

              else:

                print("password error")

            else:

              pass

 

              

    except Exception as e:

        print (e)

        return False

    return False


 

'''

    @author st

    @date 20200812

'''      

 

#从文件中加载JSON配置文件

def loadConfigJSON():

    try:

        with open('config.txt','r') as f:

            js=f.readline().replace("'",'"')

            print(js)

            f.close()

            info=ujson.loads(js)

            return info

    except:

        f.close()

        return None

    pass

#连接WIFI 失败返回None,cnt 等待时间

def connectWiFi(ssid, pswd,cnt = 3):

    wlan = network.WLAN(network.STA_IF)

    wlan.active(True)

    print(wlan.ifconfig())

    if not wlan.isconnected():

        wlan.connect(ssid, pswd)

        while not wlan.isconnected():

            time.sleep(1)

            cnt=cnt-1

            if cnt<0:

                return None  

    print(wlan.ifconfig())  

    return wlan

#打印报警信息

def showWarning(lcd,msg):

    lcd.fill_rect(0, 32, 128, 32, 0)

    lcd.text(msg,0, 32,1)

    lcd.show()

def showInfo(lcd,msg):

    lcd.fill_rect(0, 0, 128, 32, 0)

    lcd.text(msg,0, 0,1)

    lcd.show()

def clearlcd(lcd):

    lcd.fill(0)

    lcd.show()

 

def pub(server="47.112.113.136",port = 1883,topic = 'temperature',msg = 'hello mqtt',username ='test',password = 'testpassword' ):

    try:

        c = MQTTClient("umqtt_client", server,port,username,password)

        c.connect()

        c.publish(topic, msg)

        c.disconnect()

        return True

    except:

        return False

 

def timerHandle(timer):

    global lcd

    showWarning(lcd,'start config')

    if webConfigWifi() == True:

        showWarning(lcd,'config finish')

    else:

        showWarning(lcd,'config fail')

    time.sleep(3)

#中断处理函数 回调定时器

def callback(P):

    tim.init(period=1000, mode=Timer.ONE_SHOT, callback=timerHandle)

'''lcd.fill(0)

  lcd.text('server addr',20, 20,1)

  lcd.text('192.168.0.1',20, 40,1)

  lcd.show()

  def fill_rect(self, x, y, w, h, col):

 

  '''



 

if __name__ == "__main__":  

    #设备初始化

    #初始化LCD

    i2c=I2C(scl=Pin(5),sda=Pin(4),freq=100000) #esp8266

    lcd=ssd1306.SSD1306_I2C(128,64,i2c)

    #初始化配置按键

    pin = machine.Pin(0, machine.Pin.IN, machine.Pin.PULL_UP)

    pin.irq(trigger=Pin.IRQ_FALLING , handler=callback)

    #初始化定时器

    tim = Timer(-1)

    #声明全局变量 ,设备参数


 

    wifi_ssid = ''

    wifi_password = ''

    

    wlan = None

 

    # the device is on GPIO12 温度传感器

    dat = machine.Pin(2)

 

    # create the onewire object

    ds = ds18x20.DS18X20(onewire.OneWire(dat))

 

    

 

    #从Config中加载JSON配置文件

    config_JSON = loadConfigJSON()

    if config_JSON ==None:

        showWarning(lcd,'load cfg err')

        lcd.show()

    else:

        showWarning(lcd,'load cfg succ')

    time.sleep(2)

    #解析JSON,获得参数

    try:

        wifi_ssid = config_JSON['name']

    except:

        showWarning(lcd,'ssid no exist')

        time.sleep(2)

    try:

        wifi_password = config_JSON['password']

    except:

        showWarning(lcd,'psd no exist')

        time.sleep(2)

    

    #连接WIFI

    wlan = connectWiFi(wifi_ssid,wifi_password)

 

    if wlan == None:

        showWarning(lcd,'wifi fail')

        time.sleep(2)

    else:

        showWarning(lcd,'wifi succ')

        time.sleep(2)

    

 

    #采集数据

   





 

    while True:

        try:

             # scan for devices on the bus

            roms = ds.scan()

            ds.convert_temp()

            value = ds.read_temp(roms[0])

            showInfo(lcd,'temp:%s'%(str(value)))

 

            #获得时间

            URL="http://quan.suning.com/getSysTime.do"

            res=urequests.get(URL).text

            j=ujson.loads(res)

            tm = j['sysTime2']

            print(tm)

            print(type(tm))

            if pub(msg = '{"temperature":%s,"time":"%s"}'%(str(value),tm)) ==True:

                showWarning(lcd,'send succ')

            else:

                showWarning(lcd,'send fail')

        except:

            showWarning(lcd,'sensor error')

            pass

        time.sleep(5)

        clearlcd(lcd)




 

    pass

    

    

锐单商城拥有海量元器件数据手册IC替代型号,打造电子元器件IC百科大全!

相关文章