mqtt_sender.lua 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. --[[
  2. @module mqtt_sender
  3. @summary mqtt client数据发送应用功能模块
  4. @version 1.0
  5. @date 2025.07.29
  6. @author 朱天华
  7. @usage
  8. 本文件为mqtt client 数据发送应用功能模块,核心业务逻辑为:
  9. 1、sys.subscribe("SEND_DATA_REQ", send_data_req_proc_func)订阅"SEND_DATA_REQ"消息,将其他应用模块需要发送的数据存储到队列send_queue中;
  10. 2、mqtt sender task接收"CONNECT OK"、"PUBLISH_REQ"、"PUBLISH OK"三种类型的"MQTT_EVENT"消息,遍历队列send_queue,逐条发送数据到server;
  11. 3、mqtt sender task接收"DISCONNECTED"类型的"MQTT_EVENT"消息,丢弃掉队列send_queue中未发送的数据;
  12. 4、任何一条数据无论发送成功还是失败,只要这条数据有回调函数,都会通过回调函数通知数据发送方;
  13. 本文件的对外接口有1个:
  14. 1、sys.subscribe("SEND_DATA_REQ", send_data_req_proc_func):订阅"SEND_DATA_REQ"消息;
  15. 其他应用模块如果需要发送数据,直接sys.publish这个消息即可,将需要发送的topic,payload和qos以及回调函数和回调参数一起publish出去;
  16. 本demo项目中uart_app.lua和timer_app.lua中publish了这个消息;
  17. ]]
  18. local mqtt_sender = {}
  19. --[[
  20. 数据发送队列,数据结构为:
  21. {
  22. [1] = {topic="topic1", payload="payload1", qos=0, cb={func=callback_function1, para=callback_para1}},
  23. [2] = {topic="topic2", payload="payload2", qos=1, cb={func=callback_function2, para=callback_para2}},
  24. [3] = {topic="topic3", payload="payload3", qos=2, cb={func=callback_function3, para=callback_para3}},
  25. }
  26. topic的内容为publish的主题,string类型,必须存在;
  27. payload的内容为publish的负载数据,string类型,必须存在;
  28. qos的内容为publish的质量等级,number类型,取值范围0,1,2,可选,如果用户没有指定,默认为0;
  29. cb.func的内容为数据发送结果的用户回调函数,可以不存在;
  30. cb.para的内容为数据发送结果的用户回调函数的回调参数,可以不存在;
  31. ]]
  32. local send_queue = {}
  33. -- mqtt client的任务名前缀
  34. mqtt_sender.TASK_NAME_PREFIX = "mqtt_"
  35. -- mqtt_client_sender的任务名
  36. mqtt_sender.TASK_NAME = mqtt_sender.TASK_NAME_PREFIX.."sender"
  37. -- "SEND_DATA_REQ"消息的处理函数
  38. local function send_data_req_proc_func(tag, topic, payload, qos, cb)
  39. -- 将原始数据增加前缀,然后插入到发送队列send_queue中
  40. table.insert(send_queue, {topic=topic, payload="send from "..tag..": "..payload, qos=qos or 0, cb=cb})
  41. -- 发送消息通知 mqtt sender task,有新数据等待发送
  42. sys.sendMsg(mqtt_sender.TASK_NAME, "MQTT_EVENT", "PUBLISH_REQ")
  43. end
  44. -- 按照顺序发送send_queue中的数据
  45. -- 如果调用publish接口成功,则返回当前正在发送的数据项
  46. -- 如果调用publish接口失败,通知回调函数发送失败后,继续发送下一条数据
  47. local function publish_item(mqtt_client)
  48. local item
  49. -- 如果发送队列中有数据等待发送
  50. while #send_queue>0 do
  51. -- 取出来第一条数据赋值给item
  52. -- 同时从队列send_queue中删除这一条数据
  53. item = table.remove(send_queue, 1)
  54. -- publish数据
  55. -- result表示调用publish接口的同步结果,返回值有以下几种:
  56. -- 如果失败,返回nil
  57. -- 如果成功,number类型,qos为0时直接返回0;qos为1或者2时返回publish报文的message id
  58. result = mqtt_client:publish(item.topic, item.payload, item.qos)
  59. -- publish接口调用成功
  60. if result then
  61. return item
  62. -- publish接口调用失败
  63. else
  64. -- 如果当前发送的数据有用户回调函数,则执行用户回调函数
  65. if item.cb and item.cb.func then
  66. item.cb.func(false, item.cb.para)
  67. end
  68. end
  69. end
  70. end
  71. local function publish_item_cbfunc(item, result)
  72. if item then
  73. -- 如果当前发送的数据有用户回调函数,则执行用户回调函数
  74. if item.cb and item.cb.func then
  75. item.cb.func(result, item.cb.para)
  76. end
  77. end
  78. end
  79. -- mqtt client sender的任务处理函数
  80. local function mqtt_client_sender_task_func()
  81. local mqtt_client
  82. local send_item
  83. local result, msg
  84. while true do
  85. -- 等待"MQTT_EVENT"消息
  86. msg = sys.waitMsg(mqtt_sender.TASK_NAME, "MQTT_EVENT")
  87. -- mqtt连接成功
  88. -- msg[3]表示mqtt client对象
  89. if msg[2] == "CONNECT_OK" then
  90. mqtt_client = msg[3]
  91. -- 发送send_queue中的数据
  92. send_item = publish_item(mqtt_client)
  93. -- mqtt publish数据请求
  94. elseif msg[2] == "PUBLISH_REQ" then
  95. -- 如果mqtt client对象存在,并且没有正在等待发送结果的发送数据项
  96. if mqtt_client and not send_item then
  97. -- 发送send_queue中的数据
  98. send_item = publish_item(mqtt_client)
  99. end
  100. -- mqtt publish数据成功
  101. elseif msg[2] == "PUBLISH_OK" then
  102. -- publish成功,执行回调函数通知发送方
  103. publish_item_cbfunc(send_item, true)
  104. -- publish成功,通知网络环境检测看门狗功能模块进行喂狗
  105. sys.publish("FEED_NETWORK_WATCHDOG")
  106. -- 发送send_queue中的数据
  107. send_item = publish_item(mqtt_client)
  108. -- mqtt断开连接
  109. elseif msg[2] == "DISCONNECTED" then
  110. -- 清空mqtt client对象
  111. mqtt_client = nil
  112. -- 如果存在正在等待发送结果的发送项,执行回调函数通知发送方失败
  113. publish_item_cbfunc(send_item, false)
  114. -- 如果发送队列中有数据等待发送
  115. while #send_queue>0 do
  116. -- 取出来第一条数据赋值给send_item
  117. -- 同时从队列send_queue中删除这一条数据
  118. send_item = table.remove(send_queue,1)
  119. -- 执行回调函数通知发送方失败
  120. publish_item_cbfunc(send_item, false)
  121. end
  122. -- 当前没有正在等待发送结果的发送项
  123. send_item = nil
  124. end
  125. end
  126. end
  127. -- 订阅"SEND_DATA_REQ"消息;
  128. -- 其他应用模块如果需要发送数据,直接sys.publish这个消息即可,将需要发送的数据以及回调函数和回调参数一起publish出去;
  129. -- 本demo项目中uart_app.lua和timer_app.lua中publish了这个消息;
  130. sys.subscribe("SEND_DATA_REQ", send_data_req_proc_func)
  131. --创建并且启动一个task
  132. --运行这个task的处理函数mqtt_client_sender_task_func
  133. sys.taskInitEx(mqtt_client_sender_task_func, mqtt_sender.TASK_NAME)
  134. return mqtt_sender