大家好,我是ST!
上次給大家分享了如何使用ESP32實現藍牙通信,今天跟大家聊聊如何使用ESP32連接騰訊云實現遠程控制。本次實驗用到MQTT協(xié)議,同樣,我用miropython編寫程序實現,最終可以通過騰訊連連微信小程序添加設備來發(fā)布主題消息給騰訊云,ESP32負責訂閱騰訊云主題消息,當收到某訂閱消息時,來控制ESP32設備上LED燈的亮滅。
第一步、創(chuàng)建騰訊云產品和設備
瀏覽器搜索【騰訊云】,進入騰訊云官網,微信注冊登錄
點擊【產品】搜索【物聯(lián)網開發(fā)平臺】,點擊【管理控制臺】


在實例管理中點擊【公共實例】

點擊【新建項目】

填寫自己的項目名稱,點擊【保存】

點擊剛新建的項目

點擊【新建產品】

輸入自己定義的【產品名稱】,【產品類別】選智慧城市->公共事業(yè)->路燈照明,【通信設備】填wi-fi,其他的默認,點擊【確定】

點擊自己剛新建的產品名稱

下拉到最后點擊【下一步】

點擊【基于模組開發(fā)】,產品平臺選擇【樂鑫】,類型選擇【wifi】,點擊【樂鑫ESP-WROOM-】點擊【確定】

下拉到最后,點擊【下一步】

點擊【產品展示配置】右側的【配置】

【產品展示名稱】填寫自己自定義的名稱,【廠家名稱】填安信可,【產品信號】填esp32,點擊【保存】,上拉點擊【<-】返回

點擊【快捷入口配置】右側的【配置】,默認不修改,點擊【保存】,上拉點擊【<-】返回

點擊【面板配置】右側的【配置】,默認不修改,點擊【保存】,點擊【<-】返回
點擊【配網引導】右側的【配置】,【芯片方案選擇】選擇樂鑫,【首選配網方式】選擇Smart Config,【次配網方式】選擇Soft Ap,點擊;【保存】,點擊【<-】返回

點擊【掃一掃產品介紹】右側的【配置】,默認不修改,點擊【保存】,點擊【<-】返回
點擊【智能聯(lián)動配置】右側的【配置】,默認不修改,點擊保存,點擊【<-】返回
點擊【下一步】

點擊【新建設備】,自定義【設備名稱】,點擊【保存】

點擊【二維碼】,用手機微信小程序【騰訊連連】掃碼添加設備

此時設備已經創(chuàng)建完成,但是設備還未激活,需要后面的的操作連接才可激活
第二步:生成Username和Password
點擊【設備調試】,點擊【設備名稱】

復制保存設備信息三件套:設備名稱、產品ID、設備密鑰
后面生成MOTT協(xié)議中的Username和Password需要用到

打開password生成工具,在文件夾里有給出,點擊【sign.html】

輸入剛剛復制的設備信息三件套,Hmac簽名算法選擇【HMAC-SHA1】,點擊【Generate】,即可生成Usename和Password,復制保存,后面esp32連接騰訊云和MQTTfx模擬的客戶端連接騰訊云需要用到

第三步:MQTTfx模擬客戶端連接騰訊云步驟與調試(非必須步驟)
打開MQTT.fx軟件,我的壓縮包里面有,點擊下圖的齒輪圖形,進入設置

【Profile name】:自定義的名字
【Profile Type】:選擇【MQTT Broker】
【Broker Address】:騰訊云三件套中的產品ID+.iotcloud.tencentdevices.com
如我的騰訊云ID為:D89S2VVAFT,
那么Broker Address為:D89S2VVAFT.iotcloud.tencentdevices.com
Broker Port:1883
【Client ID】:騰訊云產品ID+騰訊云設備名稱+|securemode=3,signmethod=hmacsha1|
例如我的騰訊云產品ID為:D89S2VVAFT,騰訊云產品名稱為:esp_led
則Client ID為:D89S2VVAFTesp_led|securemode=3,signmethod=hmacsha1|
填寫這些數據完畢后,點擊【User Credentlals】

User Name和Password填寫第二步生成的,每個人的都不一樣
【User Name】:D89S2VVAFTesp_led;12010126;CU7SQ;1660090317
【Password】:cd6c31a3d4cfdba2759deab02fb831a0f672e008;hmacsha1
點擊【OK】保存

點擊【Connect】連接騰訊云

MQTT.fx模擬的客戶端連接騰訊云成功后可以看到右邊的原點變綠,并且有個打開的所,
這時候我們就可以通過手機端小程序騰訊連連來給騰訊云發(fā)送消息,騰訊云再將此消息發(fā)送給MQTT.fx客戶端,但是前提是MQTT.fx客戶端需要訂閱騰訊云,下面是訂閱的步驟

點擊【Subscribe】,Subscribe左邊的空格填寫格式為:
$thing/down/property/“騰訊云產品ID”/“騰訊云產品名稱”
例如:$thing/down/property/D89S2VVAFT/esp_led
填寫完成后,點擊【Subscribe】,現在手機端騰訊練練發(fā)送消息,MQTT.fx可以收到消息了

可以看到MQTT.fx收到了消息,MQTT.fx只是模擬一個客戶端接收消息,我們需要將我們的esp32模塊替代MQTT.fx,這樣我們就可以通過手機發(fā)送消息給騰訊云,騰訊云轉發(fā)消息給esp32模塊了,通過單片機對收到的數據處理,就可以通過手機端控制開發(fā)板的LED燈亮滅。

第四步、編寫程序代碼
首先需要用到MQTT的驅動代碼,我在網上找到了一個用miropython寫的驅動代碼,大家可以直接復制使用,命名為umqttsimple.py即可,代碼如下:
import usocket as socket
import ustruct as struct
from ubinascii import hexlify
class MQTTException(Exception):
pass
class MQTTClient:
def __init__(
self,
client_id,
server,
port=0,
user=None,
password=None,
keepalive=0,
ssl=False,
ssl_params={},
):
if port == 0:
port = 8883 if ssl else 1883
self.client_id = client_id
self.sock = None
self.server = server
self.port = port
self.ssl = ssl
self.ssl_params = ssl_params
self.pid = 0
self.cb = None
self.user = user
self.pswd = password
self.keepalive = keepalive
self.lw_topic = None
self.lw_msg = None
self.lw_qos = 0
self.lw_retain = False
def _send_str(self, s):
self.sock.write(struct.pack("!H", len(s)))
self.sock.write(s)
def _recv_len(self):
n = 0
sh = 0
while 1:
b = self.sock.read(1)[0]
n |= (b & 0x7F) << sh
if not b & 0x80:
return n
sh += 7
def set_callback(self, f):
self.cb = f
def set_last_will(self, topic, msg, retain=False, qos=0):
assert 0 <= qos <= 2
assert topic
self.lw_topic = topic
self.lw_msg = msg
self.lw_qos = qos
self.lw_retain = retain
def connect(self, clean_session=True):
self.sock = socket.socket()
addr = socket.getaddrinfo(self.server, self.port)[0][-1]
self.sock.connect(addr)
if self.ssl:
import ussl
self.sock = ussl.wrap_socket(self.sock, **self.ssl_params)
premsg = bytearray(b"x10")
msg = bytearray(b"x04MQTTx04x02")
sz = 10 + 2 + len(self.client_id)
msg[6] = clean_session << 1
if self.user is not None:
sz += 2 + len(self.user) + 2 + len(self.pswd)
msg[6] |= 0xC0
if self.keepalive:
assert self.keepalive < 65536
msg[7] |= self.keepalive >> 8
msg[8] |= self.keepalive & 0x00FF
if self.lw_topic:
sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
msg[6] |= self.lw_retain << 5
i = 1
while sz > 0x7F:
premsg[i] = (sz & 0x7F) | 0x80
sz >>= 7
i += 1
premsg[i] = sz
self.sock.write(premsg, i + 2)
self.sock.write(msg)
# print(hex(len(msg)), hexlify(msg, ":"))
self._send_str(self.client_id)
if self.lw_topic:
self._send_str(self.lw_topic)
self._send_str(self.lw_msg)
if self.user is not None:
self._send_str(self.user)
self._send_str(self.pswd)
resp = self.sock.read(4)
assert resp[0] == 0x20 and resp[1] == 0x02
if resp[3] != 0:
raise MQTTException(resp[3])
return resp[2] & 1
def disconnect(self):
self.sock.write(b"xe0")
self.sock.close()
def ping(self):
self.sock.write(b"xc0")
def publish(self, topic, msg, retain=False, qos=0):
pkt = bytearray(b"x30")
pkt[0] |= qos << 1 | retain
sz = 2 + len(topic) + len(msg)
if qos > 0:
sz += 2
assert sz < 2097152
i = 1
while sz > 0x7F:
pkt[i] = (sz & 0x7F) | 0x80
sz >>= 7
i += 1
pkt[i] = sz
# print(hex(len(pkt)), hexlify(pkt, ":"))
self.sock.write(pkt, i + 1)
self._send_str(topic)
if qos > 0:
self.pid += 1
pid = self.pid
struct.pack_into("!H", pkt, 0, pid)
self.sock.write(pkt, 2)
self.sock.write(msg)
if qos == 1:
while 1:
op = self.wait_msg()
if op == 0x40:
sz = self.sock.read(1)
assert sz == b"x02"
rcv_pid = self.sock.read(2)
rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
if pid == rcv_pid:
return
elif qos == 2:
assert 0
def subscribe(self, topic, qos=0):
assert self.cb is not None, "Subscribe callback is not set"
pkt = bytearray(b"x82")
self.pid += 1
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
# print(hex(len(pkt)), hexlify(pkt, ":"))
self.sock.write(pkt)
self._send_str(topic)
self.sock.write(qos.to_bytes(1, "little"))
while 1:
op = self.wait_msg()
if op == 0x90:
resp = self.sock.read(4)
# print(resp)
assert resp[1] == pkt[2] and resp[2] == pkt[3]
if resp[3] == 0x80:
raise MQTTException(resp[3])
return
# Wait for a single incoming MQTT message and process it.
# Subscribed messages are delivered to a callback previously
# set by .set_callback() method. Other (internal) MQTT
# messages processed internally.
def wait_msg(self):
res = self.sock.read(1)
self.sock.setblocking(True)
if res is None:
return None
if res == b"":
raise OSError(-1)
if res == b"xd0": # PINGRESP
sz = self.sock.read(1)[0]
assert sz == 0
return None
op = res[0]
if op & 0xF0 != 0x30:
return op
sz = self._recv_len()
topic_len = self.sock.read(2)
topic_len = (topic_len[0] << 8) | topic_len[1]
topic = self.sock.read(topic_len)
sz -= topic_len + 2
if op & 6:
pid = self.sock.read(2)
pid = pid[0] << 8 | pid[1]
sz -= 2
msg = self.sock.read(sz)
self.cb(topic, msg)
if op & 6 == 2:
pkt = bytearray(b"x40x02")
struct.pack_into("!H", pkt, 2, pid)
self.sock.write(pkt)
elif op & 6 == 4:
assert 0
# Checks whether a pending message from server is available.
# If not, returns immediately with None. Otherwise, does
# the same processing as wait_msg.
def check_msg(self):
self.sock.setblocking(False)
return self.wait_msg()
主程序代碼如下:
import time import network from umqttsimple import MQTTClient from machine import Pin,Timer def do_connect(): wlan = network.WLAN(network.STA_IF) wlan.active(True) if not wlan.isconnected(): print('connecting to network...') wlan.connect('11', '1234567a') i = 1 while not wlan.isconnected(): print("正在鏈接...{}".format(i)) i += 1 time.sleep(1) print('network config:', wlan.ifconfig()) def sub_cb(topic, msg): # 回調函數,收到服務器消息后會調用這個函數 print(topic, msg) top=str(topic,'UTF-8') strdata=str(msg,'UTF-8') count=strdata.find("power_switch") print(strdata[count+len("power_switch")+2]) if strdata[count+len("power_switch")+2]=='0' and top=='$thing/down/property/D89S2VVAFT/esp_led': led.value(1) if strdata[count+len("power_switch")+2]=='1' and top=='$thing/down/property/D89S2VVAFT/esp_led': led.value(0) #客戶端ID client_id="D89S2VVAFTesp_led|securemode=3,signmethod=hmacsha1|" #服務器域名 addr="D89S2VVAFT.iotcloud.tencentdevices.com" #端口號 port=1883 #用戶名 username="D89S2VVAFTesp_led;12010126;KTXHT;1669512546" #密碼 password="0569df86e0c75494960cc922703c9ddd47c3fee048ed03d6b1a22d89a8b8a305;hmacsha256" led=Pin(22,Pin.OUT) led.value(1) # 1. 聯(lián)網 do_connect() # 2. 創(chuàng)建mqt c = MQTTClient(client_id=client_id,server=addr,port=port,user=username,password=password,keepalive=60) # 建立一個MQTT客戶端 c.set_callback(sub_cb) # 設置回調函數 c.connect() # 建立連接 c.subscribe(b"$thing/down/property/D89S2VVAFT/esp_led") # 監(jiān)控ledctl這個通道,接收控制命令 timer1 = Timer(0) timer1.init(period=1000*60, mode=Timer.PERIODIC, callback=lambda t: c.ping()) while True: c.check_msg()

點擊編譯,編譯成功如下

查看騰訊云建立的設備,可以看到,此時設備已經在線

點擊二維碼,打開騰訊連連小程序,掃碼添加設備

現在就可以通過騰訊連連來控制ESP32設備啦!點擊點燈開關,ESP32設備打印出收到的消息,如下:

審核編輯:湯梓紅
-
遠程控制
+關注
關注
4文章
726瀏覽量
37027 -
騰訊云
+關注
關注
0文章
224瀏覽量
17446 -
MQTT
+關注
關注
5文章
733瀏覽量
25066 -
ESP32
+關注
關注
26文章
1194瀏覽量
21670
原文標題:使用ESP32連接騰訊云實現遠程控制
文章出處:【微信號:嵌入式開發(fā)愛好者,微信公眾號:嵌入式開發(fā)愛好者】歡迎添加關注!文章轉載請注明出處。
發(fā)布評論請先 登錄
使用CW32單片機通過ESP8266連接OneNET物聯(lián)網平臺
ESP32S3連接阿里云物聯(lián)網平臺LinkSDK報錯怎么解決?
STM32F103C8T6連接阿里云物聯(lián)網平臺
為什么ESP32對接騰訊云ASR平臺會出現偶發(fā)性的簽名錯誤問題呢
有沒有例程通過esp32連接AWS IOT平臺?如何連接到AWSiot平臺?
請問能提供一份release版本的ESP32騰訊云AT固件嗎?
想通過ESP8266連接阿里云,固件哪里有?
關于ESP8266和機智云實現遠程控制車庫卷簾門解析
吃白菜一樣用micropython玩esp32(四)—— siri遠程控制點燈
使用ESP32、Python和javascript的遠程控制繼電器
ESP32 Wi-Fi 控制 LED 燈的原理
使用ESP32連接騰訊云實現遠程控制
評論