MessagemqttTask.lua 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. --- 模块功能:微消息队列_MQTT客户端处理框架
  2. -- @author openLuat
  3. -- @module mqtt.mqttTask
  4. -- @license MIT
  5. -- @copyright openLuat
  6. -- @release 2022.08.31
  7. module(...,package.seeall)
  8. require"misc"
  9. require"mqtt"
  10. require"MessagemqttOutMsg"
  11. require"MessagemqttInMsg"
  12. local ready = false
  13. ---------------注意ParentTopic Group_ID 的长度不易过长 ,阿里云限制最多64个字符---------------
  14. ParentTopic="XXX" --父级Topic 必须在微消息队列MQTT版控制台创建
  15. Group_ID="XXXXX" --共用组名 必须在微消息队列MQTT版控制台创建
  16. HOST="XXXXX"--微消息队列接入点
  17. INSTANCE_ID="XXXXXX" --实例ID 微消息队列MQTT版控制台可以看到
  18. AccessKey_ID="XXXXXXXX" ---阿里云访问凭证
  19. AccessKey_Secret="XXXXXXXXXXX"
  20. ---------------微消息队列参数设置---------------
  21. --- MQTT连接是否处于激活状态
  22. -- @return 激活状态返回true,非激活状态返回false
  23. -- @usage mqttTask.isReady()
  24. function isReady()
  25. return ready
  26. end
  27. function getSubscribeTopic()
  28. -- return PARENT_TOPIC.."/y506/"..nvm.get("deviceSn")
  29. return ParentTopic.."/ceshiwanglong" ----改为自己的topic
  30. --------p2p模式----------------------
  31. ---return ParentTopic.."/p2p/".."/Group_ID .. "@@@" ..misc.getSn()/" ----改为自己的topic
  32. ----发送P2P消息时,需将二级Topic设为“p2p”,将三级Topic设为目标接收者的Client ID。
  33. ----接收消息的客户端无需任何订阅处理,只需要完成客户端的初始化即可收到P2P消息。
  34. --------p2p模式----------------------
  35. end
  36. --启动MQTT客户端任务
  37. sys.taskInit(function()
  38. local retryConnectCnt = 0
  39. while true do
  40. if not socket.isReady() then
  41. retryConnectCnt = 0
  42. -- 等待网络环境准备就绪,超时时间是5分钟
  43. sys.waitUntil("IP_READY_IND", 300000)
  44. end
  45. if socket.isReady() then
  46. -- 创建一个MQTT客户端
  47. local clientID = Group_ID .. "@@@" ..misc.getSn()
  48. local userName = "Signature|" .. AccessKey_ID .. "|" .. INSTANCE_ID
  49. local password = string.fromHex(crypto.hmac_sha1(clientID, clientID:len(),AccessKey_Secret,AccessKey_Secret:len()))
  50. password = crypto.base64_encode(password, password:len())
  51. local mqttClient = mqtt.client(clientID,600,userName,password,nil)
  52. -- 阻塞执行MQTT CONNECT动作,直至成功
  53. -- 如果使用ssl连接,打开mqttClient:connect("lbsmqtt.airm2m.com",1884,"tcp_ssl",{caCert="ca.crt"}),根据自己的需求配置
  54. -- mqttClient:connect("lbsmqtt.airm2m.com",1884,"tcp_ssl",{caCert="ca.crt"})
  55. if mqttClient:connect(HOST, 1883, "tcp") then
  56. retryConnectCnt = 0
  57. ready = true
  58. -- 订阅主题
  59. if mqttClient:subscribe(getSubscribeTopic(),1) then
  60. MessagemqttOutMsg.init()
  61. -- 循环处理接收和发送的数据
  62. while true do
  63. if not MessagemqttInMsg.proc(mqttClient) then
  64. log.error("mqttTask.MessagemqttInMsg.proc error")
  65. break
  66. end
  67. if not MessagemqttOutMsg.proc(mqttClient) then
  68. log.error("mqttTask.MessagemqttOutMsg.proc error")
  69. break
  70. end
  71. end
  72. MessagemqttOutMsg.unInit()
  73. end
  74. ready = false
  75. else
  76. retryConnectCnt = retryConnectCnt + 1
  77. end
  78. -- 断开MQTT连接
  79. mqttClient:disconnect()
  80. if retryConnectCnt >= 5 then
  81. link.shut()
  82. retryConnectCnt = 0
  83. end
  84. sys.wait(5000)
  85. else
  86. -- 进入飞行模式,20秒之后,退出飞行模式
  87. net.switchFly(true)
  88. sys.wait(20000)
  89. net.switchFly(false)
  90. end
  91. end
  92. end)