一、编写客户端代码
二、编写设备端控制代码
[codesyntax lang=”python”]
from machine import Pin, Timer
from neopixel import NeoPixel
from simple import MQTTClient
import time
import network
#定义RGB控制对象
#控制引脚为16,RGB灯串联5个
pin=16
rgb_num=5
rgb_led=NeoPixel(Pin(pin,Pin.OUT),rgb_num)
#定义RGB颜色
RED = (255, 0, 0)
ORANGE = (255, 165, 0)
YELLOW = (255, 150, 0)
GREEN = (0, 255, 0)
BLUE = (0, 0, 255)
INDIGO = (75, 0, 130)
VIOLET = (138, 43, 226)
BLACK = (0, 0, 0)
COLORS = (RED, ORANGE, YELLOW, GREEN, BLUE, INDIGO, VIOLET)
def RGB_led_off():
for i in range(rgb_num):
rgb_led[i]=(0, 0, 0)
rgb_led.write()
def RGB_led_on():
for color in COLORS:
for i in range(rgb_num):
rgb_led[i]=(color[0], color[1], color[2])
rgb_led.write()
time.sleep_ms(100)
time.sleep_ms(1000)
#以下MQTT相关
msg = “”
msg_count=0
led1 = Pin(15, Pin.OUT) # 定义LED控制对象
# 配置信息
CONFIG = {
“ssid”: “liujh”,
“password”: “”,
“mqtt_server”: “”,
“mqtt_port”: 1083,
“client_id”: “ESP32_Client”,
“topic”: “mytest”,
“mqtt_broker_username”: “ljh”,
“mqtt_broker_password”: “”
}
# WIFI连接函数
def wifi_connect(ssid, password, retries=5, delay=15): # Wi-Fi连接函数
wlan = network.WLAN(network.STA_IF) # STA模式
wlan.active(True) # 激活网络接口
start_time = time.time() # 记录开始连接的时间
if not wlan.isconnected():
print(“Connecting to network…”)
wlan.connect(ssid, password) # 连接到指定WIFI
while not wlan.isconnected():
led1.value(1) # LED闪烁表示正在尝试连接
time.sleep_ms(300)
led1.value(0)
time.sleep_ms(300)
# 超过15秒未连接成功,视为超时
if time.time() – start_time > delay:
print(“WIFI Connect Timeout!”)
led1.value(0) # 超时后关闭LED
return False
led1.value(1) # LED常亮
print(“Network connected!”)
print(“Network information:”, wlan.ifconfig())
return True
#设置 MQTT 回调函数,有信息时候执行
def mqtt_callback(topic,msg):
# 将字节类型的消息解码为字符串
message_str = msg.decode(‘utf-8’)
print(“topic: {}”.format(topic))
print(“msg: {}”.format(message_str))
print(f”Received message from topic {topic}: {message_str}”)
if message_str == “on”:
print(“要我开灯了哦!”)
RGB_led_on()
client.publish(CONFIG[‘topic’], “switch on! switch on! switch on! switch on! switch on! “) #这里一发中文就出错,原因不明
if message_str == “off”:
print(“要关灯啊?”)
RGB_led_off()
client.publish(CONFIG[‘topic’], “switch off! switch off! switch off! switch off! switch off! “)
if message_str == “bye”: # 现在msg是一个字符串,可以与字符串进行比较
print(f”再见消息来了啦 —> {topic}: {message_str}”)
client.publish(CONFIG[‘topic’], “no bye bye!”)
# 主程序
if __name__ == “__main__”:
if wifi_connect(CONFIG[‘ssid’], CONFIG[‘password’]):
try:
SERVER=””
PORT=1083
CLIENT_ID=”ESP32_Client” #客户端ID
TOPIC=”mytest” #TOPIC名称
client = MQTTClient(CLIENT_ID, SERVER, PORT, “”, “”) #建立客户端
#client = MQTTClient(CONFIG[‘client_id’], CONFIG[‘mqtt_server’], CONFIG[‘mqtt_port’],
# CONFIG[‘mqtt_broker_username’], CONFIG[‘mqtt_broker_password’], keepalive=60)
client.set_callback(mqtt_callback) #配置回调函数
client.connect()
client.subscribe(TOPIC) #订阅主题
client.publish(TOPIC, CONFIG[‘client_id’])
print(f”MQTT Connected 、Subscribed and first publish message -> {CONFIG[‘topic’]}:{CONFIG[‘client_id’]}”)
except Exception as e:
print(e)
while True:
msg_count += 1
msg = f”Message {msg_count} from ESP32″
if msg_count % 6 ==0 :
try:
client.publish(CONFIG[‘topic’], msg)
print(f”send message to topic {CONFIG[‘topic’]}:{msg}”)
except Exception as e:
print(f”MQTT connection failed: {e}”)
client.check_msg()
print(msg)
time.sleep(1) # 休眠10秒
[/codesyntax]