mqttOutMsg.lua 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. --- 模块功能:MQTT客户端数据发送处理
  2. -- @author openLuat
  3. -- @module mqtt.mqttOutMsg
  4. -- @license MIT
  5. -- @copyright openLuat
  6. -- @release 2018.03.28
  7. module(...,package.seeall)
  8. --数据发送的消息队列
  9. local msgQueue = {}
  10. local function insertMsg(topic,payload,qos,user)
  11. table.insert(msgQueue,{t=topic,p=payload,q=qos,user=user})
  12. sys.publish("APP_SOCKET_SEND_DATA")
  13. end
  14. local function pubQos0TestCb(result)
  15. log.info("mqttOutMsg.pubQos0TestCb",result)
  16. if result then sys.timerStart(pubQos0Test,10000) end
  17. end
  18. function pubQos0Test()
  19. insertMsg("/qos0topic","你好",0,{cb=pubQos0TestCb})
  20. end
  21. local function pubQos1TestCb(result)
  22. log.info("mqttOutMsg.pubQos1TestCb",result)
  23. if result then sys.timerStart(pubQos1Test,20000) end
  24. end
  25. function pubQos1Test()
  26. insertMsg("/中文qos1topic","中文qos1data",1,{cb=pubQos1TestCb})
  27. end
  28. --- 初始化“MQTT客户端数据发送”
  29. -- @return 无
  30. -- @usage mqttOutMsg.init()
  31. function init()
  32. -- pubQos0Test()
  33. -- pubQos1Test()
  34. end
  35. --- 去初始化“MQTT客户端数据发送”
  36. -- @return 无
  37. -- @usage mqttOutMsg.unInit()
  38. function unInit()
  39. sys.timerStop(pubQos0Test)
  40. sys.timerStop(pubQos1Test)
  41. while #msgQueue>0 do
  42. local outMsg = table.remove(msgQueue,1)
  43. if outMsg.user and outMsg.user.cb then outMsg.user.cb(false,outMsg.user.para) end
  44. end
  45. end
  46. --- MQTT客户端数据发送处理
  47. -- @param mqttClient,MQTT客户端对象
  48. -- @return 处理成功返回true,处理出错返回false
  49. -- @usage mqttOutMsg.proc(mqttClient)
  50. function proc(mqttClient)
  51. while #msgQueue>0 do
  52. local outMsg = table.remove(msgQueue,1)
  53. local result = mqttClient:publish(outMsg.t,outMsg.p,outMsg.q)
  54. if outMsg.user and outMsg.user.cb then outMsg.user.cb(result,outMsg.user.para) end
  55. if not result then return end
  56. end
  57. return true
  58. end
  59. ---------------------------------
  60. -- 订阅 LoRa 主机的传感器数据更新事件
  61. ---------------------------------
  62. sys.subscribe("SENSOR_UPDATE", function(data)
  63. -- log.info("收到从机数据", json.encode(data))
  64. -- 打包成 JSON
  65. -- local payload = {
  66. -- addr = addr,
  67. -- ts = ts,
  68. -- data = data
  69. -- }
  70. local json_data = json.encode(data)
  71. log.info("[MQTT] Upload", json_data)
  72. -- 放入 MQTT 消息队列
  73. insertMsg("/System_Fire_Warning/LoRa/Air724UG/Sensor/Data", json_data, 1, {
  74. cb = function(result)
  75. log.info("[MQTT] 上报结果", result and "成功" or "失败")
  76. end
  77. })
  78. end)