micropython 配网上报温湿度
0 票
boot.py
import network, utime, gc, socket, ure from machine import I2C, Pin, Timer from sht2x import SHT2x from simple import MQTTClient sensor = SHT2x(I2C(scl=Pin(5), sda=Pin(4))) led1 = Pin(2, Pin.OUT) led1.value(1) LED = Pin(15, Pin.OUT) LED.value(1) utime.sleep(10) SERVER = "mq.tongxinmao.com" PORT = 18830 CLIENT_ID = "umqtt_sht20" c, server_socket = None, None NETWORK_PROFILES = 'wifi.dat' tm = Timer(10) tm1 = Timer(11) wlan_ap = network.WLAN(network.AP_IF) wlan_sta = network.WLAN(network.STA_IF) def LED_up(t): LED.value(0) def LED_down(t): LED.value(1) tm.init(freq=0.2, mode=Timer.PERIODIC, callback=LED_up) utime.sleep_ms(10) tm1.init(freq=0.2, mode=Timer.PERIODIC, callback=LED_down) def getsht20th(): sht20th=[] sht20t = sensor.getTemperature() sht20h = sensor.getHumidity() sht20th.append('%.2f' % sht20t) sht20th.append('%.2f%%' % sht20h) return sht20th def pubmsg(server=SERVER, port=PORT): global c th = getsht20th() c = MQTTClient(CLIENT_ID, server, port) c.connect() c.publish(b"/public/TEST/sht20th", "T:" + th[0] + ",H:" + th[1]) print("T:" + th[0] + ",H:" + th[1]) c.disconnect() def send_header(conn, status_code=200, content_length=None ): conn.sendall("HTTP/1.0 {} OK\r\n".format(status_code)) conn.sendall("Content-Type: text/html\r\n") if content_length is not None: conn.sendall("Content-Length: {}\r\n".format(content_length)) conn.sendall("\r\n") def send_response(conn, payload, status_code=200): content_length = len(payload) send_header(conn, status_code, content_length) if content_length > 0: conn.sendall(payload) conn.close() def config_page(): th = getsht20th() return b"""<!DOCTYPE HTML><html><head><title>SHT20_T&H AP</title> <meta charset="UTF-8"><meta name="viewport" content="width=device-width, initial-scale=1"> <link rel="icon" href="data:;base64,="><meta name="viewport" content="width=device-width, initial-scale=1"> <link rel="stylesheet" href="https://use.fontawesome.com/releases/v5.7.2/css/all.css" integrity="sha384-fnmOCqbTlWIlj8LyTjo7mOUStjsKC4pOpQbqyi7RrhN7udi9RwhKkMHpvLbHG9Sr" crossorigin="anonymous"> <style> html { font-family: Arial; display: inline-block; margin: 0px auto; text-align: center; } h2 { font-size: 3.0rem; } p { font-size: 3.0rem; } .units { font-size: 1.2rem; } .ds-labels{ font-size: 1.5rem; vertical-align:middle; padding-bottom: 15px; } #Time,#p{ font-size: 1.0rem; text-align: center; } </style></head><body><h1>Wifi 配网</h1><form action="configure" method="post"> <div><label>SSID</label><input type="text" name="ssid"></div><div><label>PASSWORD</label> <input type="password" name="password"></div><input type="submit" value="连接"><form> <h2>SHT20 Senor</h2><p><i class="fas fa-thermometer-half" style="color:#059e8a;"></i> <span id="temperature"> %s </span><sup class="units">°C</sup></p> <p><i class="fa fa-tint" style="color:#059e8a;"></i><span id="humidity"> %s </span><sup class="units"></sup></p> </body></html>""" % (th[0], th[1]) def wifi_conf_page(ssid, passwd): return b"""<html> <head> <title>Wifi Conf Info</title> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1"> </head> <body> <h1>Post data:</h1> <p>SSID: %s</p> <p>PASSWD: %s</p> <a href="/">Return Configure Page</a> </body> </html>""" % (ssid, passwd) def connect_sucess(new_ip): return b"""<html> <head> <title>Connect Sucess!</title> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1"> </head> <body> <p>Wifi Connect Sucess</p> <p>IP Address: %s</p> <a href="http://%s">Home</a> <a href="/disconnect">Disconnect</a> </body> </html>""" % (new_ip, new_ip) def get_wifi_conf(request): match = ure.search("ssid=([^&]*)&password=(.*)", request) if match is None: return False else: try: ssid = match.group(1).decode("utf-8").replace("%3F", "?").replace("%21", "!") password = match.group(2).decode("utf-8").replace("%3F", "?").replace("%21", "!") except Exception: try: ssid = match.group(1).replace("%3F", "?").replace("%21", "!") password = match.group(2).replace("%3F", "?").replace("%21", "!") except Exception: return False if len(ssid) == 0: return False return (ssid, password) def handle_wifi_configure(ssid, password): if do_connect(ssid, password): new_ip = wlan_sta.ifconfig()[0] return new_ip else: print('connect fail') return False def check_wlan_connected(): if wlan_sta.isconnected(): return True else: return False def do_connect(ssid, password): wlan_sta.active(True) if wlan_sta.isconnected(): return None print('Connect to %s' % ssid) wlan_sta.connect(ssid, password) for retry in range(100): connected = wlan_sta.isconnected() if connected: break utime.sleep_ms(100) print('.', end='') if connected: print('\nConnected : ', wlan_sta.ifconfig()) else: print('\nFailed. Not Connected to: ' + ssid) return connected def read_profiles(): with open(NETWORK_PROFILES) as f: lines = f.readlines() profiles = {} for line in lines: ssid, password = line.strip("\n").split(";") profiles[ssid] = password return profiles def write_profiles(profiles): lines = [] for ssid, password in profiles.items(): lines.append("%s;%s\n" % (ssid, password)) with open(NETWORK_PROFILES, "w") as f: f.write(''.join(lines)) def stop(): global server_socket if server_socket: server_socket.close() server_socket = None def startAP(): global server_socket stop() wlan_ap.active(True) wlan_ap.config(essid='SHT20_T&H',authmode=0) server_socket = socket.socket() server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(('0.0.0.0', 80)) server_socket.listen(5) while not wlan_sta.isconnected(): conn, addr = server_socket.accept() print('Connection: %s ' % str(addr)) try: conn.settimeout(3) request = b"" try: while "\r\n\r\n" not in request: request += conn.recv(1024) except OSError: pass # url process if ure.search("(?:GET|POST) /(.*?)(?:\\?.*?)? HTTP", request) is None: pass else: try: url = ure.search("(?:GET|POST) /(.*?)(?:\\?.*?)? HTTP", request).group(1).decode("utf-8").rstrip("/") except Exception: try: url = ure.search("(?:GET|POST) /(.*?)(?:\\?.*?)? HTTP", request).group(1).rstrip("/") except Exception: pass pass print("URL is {}".format(url)) if url == "": response = config_page() send_response(conn, response) elif url == "configure": ret = get_wifi_conf(request) if type(ret) is "list": ret = handle_wifi_configure(ret[0], ret[1]) if ret is not None: response = connect_sucess(ret) send_response(conn, response) print('connect sucess') elif url == "disconnect": wlan_sta.disconnect() finally: conn.close() wlan_ap.active(False) print('ap exit') def home(): global server_socket stop() wlan_sta.active(True) ip_addr = wlan_sta.ifconfig()[0] print('wifi connected') server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server_socket.bind(('0.0.0.0', 80)) server_socket.listen(5) while check_wlan_connected(): conn, addr = server_socket.accept() try: conn.settimeout(3) request = b"" try: while "\r\n\r\n" not in request: request += conn.recv(1024) except OSError: pass # url process if ure.search("(?:GET|POST) /(.*?)(?:\\?.*?)? HTTP", request) is None: pass else: try: url = ure.search("(?:GET|POST) /(.*?)(?:\\?.*?)? HTTP", request).group(1).decode("utf-8").rstrip("/") except Exception: try: url = ure.search("(?:GET|POST) /(.*?)(?:\\?.*?)? HTTP", request).group(1).rstrip("/") except Exception: pass if url == "": response = connect_sucess(ip_addr) send_response(conn, response) elif url == "disconnect": wlan_sta.disconnect() finally: conn.close() wlan_sta.active(False) print('sta exit') if __name__ == "__main__": while True: if not check_wlan_connected(): startAP() else: while check_wlan_connected() is True: try: pubmsg(server=SERVER, port=PORT) except Exception: pass utime.sleep(5)