websocket.lua 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. --- 模块功能:websocket客户端
  2. -- @module websocket
  3. -- @author OpenLuat
  4. -- @license MIT
  5. -- @copyright OpenLuat.com
  6. -- @release 2021.04.08
  7. require "utils"
  8. require "socket"
  9. module(..., package.seeall)
  10. local magic = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
  11. local ws = {}
  12. ws.__index = ws
  13. local function websocket(url, cert)
  14. return setmetatable({
  15. io = nil,
  16. url = url,
  17. key = "",
  18. wss = "",
  19. cert = cert,
  20. host = "",
  21. port = "",
  22. input = "",
  23. callbacks = {},
  24. send_data = {},
  25. send_text = nil,
  26. sendsize = 1460,
  27. open_callback = false,
  28. connected = false,
  29. terminated = false,
  30. readyState = "CONNECTING"
  31. }, ws)
  32. end
  33. --- 创建 websocket 对象
  34. -- @string url websocket服务器的连接地址,格式为ws(或wss)://xxx开头
  35. -- @table[opt=nil] cert ssl连接需要的证书配置,cert格式如下:
  36. -- {
  37. -- caCert = "ca.crt", --CA证书文件(Base64编码 X.509格式),如果存在此参数,则表示客户端会对服务器的证书进行校验;不存在则不校验
  38. -- clientCert = "client.crt", --客户端证书文件(Base64编码 X.509格式),服务器对客户端的证书进行校验时会用到此参数
  39. -- clientKey = "client.key", --客户端私钥文件(Base64编码 X.509格式)
  40. -- clientPassword = "123456", --客户端证书文件密码[可选]
  41. -- insist = 1, --证书中的域名校验失败时,是否坚持连接,默认为1,坚持连接,0为不连接
  42. -- }
  43. -- @return table 返回1个websocket对象
  44. -- @usage local ws = websocket.new("ws://121.40.165.18:8800")
  45. function new(url, cert)
  46. return websocket(url, cert)
  47. end
  48. --- ws:on 注册函数
  49. -- @string event 事件,可选值"open","message","close","error","pong"
  50. -- @function callback 回调方法,message|error|pong形参是该方法需要的数据。
  51. -- @usage mt:on("message",function(message) local print(message)end)
  52. function ws:on(event, callback)
  53. self.callbacks[event] = callback
  54. end
  55. --- websocket 与 websocket 服务器建立连接
  56. -- @number timeout 与websocket服务器建立连接最长超时
  57. -- @return bool,true,表示连接成功,false or nil 表示连接失败
  58. -- @usage while not ws:connect(20000) do sys.wait(2000) end
  59. function ws:connect(timeout)
  60. self.wss, self.host, self.port, self.path = self.url:match("(%a+)://([%w%.%-]+):?(%d*)(.*)")
  61. self.wss, self.host = self.wss:lower(), self.host:lower()
  62. self.port = self.port ~= "" and self.port or (self.wss == "wss" and 443 or 80)
  63. if self.wss == "wss" then
  64. self.io = socket.tcp(true, self.cert)
  65. else
  66. self.io = socket.tcp()
  67. end
  68. if not self.io then
  69. log.error("websocket:connect:", "没有可用的TCP通道!")
  70. return false
  71. end
  72. log.info("websocket url:", self.url)
  73. if not self.io:connect(self.host, self.port, timeout) then
  74. log.error("websocket:connect", "服务器连接失败!")
  75. return false
  76. end
  77. self.key = crypto.base64_encode(math.random(100000000000000, 999999999999999) .. 0, 16)
  78. local req = "GET " .. self.path .. " HTTP/1.1\r\nHost: " .. self.host .. ":" .. self.port .. "\r\nConnection: Upgrade\r\nUpgrade: websocket\r\n" .. "Origin: http://" .. self.host ..
  79. "\r\nSec-WebSocket-Version: 13\r\n" .. "Sec-WebSocket-Key: " .. self.key .. "\r\n\r\n"
  80. if self.io:send(req, tonumber(timeout) or 20000) then
  81. local r, s = self.io:recv(tonumber(timeout) or 5000)
  82. if not r then
  83. self.io:close()
  84. log.error("websocket:connect", "与 websocket server 握手超时!")
  85. return false
  86. end
  87. local _, idx, code = s:find("%s(%d+)%s.-\r\n")
  88. if code == "101" then
  89. local header, accept = {}, self.key .. magic
  90. accept = crypto.sha1(accept, #accept):fromHex()
  91. accept = crypto.base64_encode(accept, #accept)
  92. for k, v in string.gmatch(s:sub(idx + 1, -1), "(.-):%s*(.-)\r\n") do
  93. header[k:lower()] = v
  94. end
  95. if header["sec-websocket-accept"] and header["sec-websocket-accept"] == accept then
  96. log.info("websocket:connect", "与 websocket server 握手成功!")
  97. self.connected, self.readyState = true, "OPEN"
  98. if self.callbacks.open then
  99. self.open_callback = true
  100. end
  101. return true
  102. end
  103. end
  104. end
  105. log.error("websocket:connect", "与 websocket server 握手失败!")
  106. return false
  107. end
  108. -- 掩码加密
  109. -- mask: 4位长度掩码字符串
  110. -- data: 待加密的字符串
  111. -- return: 掩码加密后的字符串
  112. local function wsmask(mask, data)
  113. local i = 0
  114. return data:gsub(".", function(c)
  115. i = i + 1
  116. return string.char(bit.bxor(data:byte(i), mask:byte((i - 1) % 4 + 1)))
  117. end)
  118. end
  119. --- websocket发送帧方法
  120. -- @bool fin true表示结束帧,false表示延续帧
  121. -- @number opcode 0x0--0xF,其他值非法,代码意义参考websocket手册
  122. -- @string data 用户要发送的数据
  123. -- @usage self:sendFrame(true, 0x1, "www.openluat.com")
  124. function ws:sendFrame(fin, opcode, data)
  125. if not self.connected then
  126. return
  127. end
  128. local finbit, maskbit, len = fin and 0x80 or 0, 0x80, #data
  129. local frame = pack.pack("b", bit.bor(finbit, opcode))
  130. if len < 126 then
  131. frame = frame .. pack.pack("b", bit.bor(len, maskbit))
  132. elseif len < 0xFFFF then
  133. frame = frame .. pack.pack(">bH", bit.bor(126, maskbit), len)
  134. else
  135. -- frame = frame .. pack.pack(">BL", bit.bor(127, maskbit), len)
  136. log.error("ws:sendFrame", "数据长度超过最大值!")
  137. end
  138. local mask = pack.pack(">I", os.time())
  139. frame = frame .. mask .. wsmask(mask, data)
  140. for i = 1, #frame, self.sendsize do
  141. if not self.io:send(frame:sub(i, i + self.sendsize - 1)) then
  142. break
  143. end
  144. end
  145. end
  146. -- websocket 发送用户数据方法
  147. -- @string data: 用户要发送的字符串数据
  148. -- @bool text: true 数据为文本字符串,nil或false数据为二进制数据。
  149. -- @usage self:send("www.openluat.com")
  150. -- @usage self:send("www.openluat.com",true)
  151. -- @usage self:send(string.fromHex("www.openluat.com"))
  152. local function send(ws, data, text)
  153. if text then
  154. log.info("websocket cleint send:", data:sub(1, 100))
  155. ws:sendFrame(true, 0x1, data)
  156. else
  157. ws:sendFrame(true, 0x2, data)
  158. end
  159. if ws.callbacks.sent then
  160. ws.callbacks.sent()
  161. end
  162. end
  163. function ws:send(data, text)
  164. table.insert(self.send_data, data)
  165. self.send_text = text
  166. sys.publish("WEBSOCKET_SEND_DATA", "send")
  167. end
  168. -- websocket发送ping包
  169. -- @string data: 用户要发送的文本数据
  170. -- @usage self:ping("hello")
  171. local function ping(ws, data)
  172. ws:sendFrame(true, 0x9, data)
  173. end
  174. function ws:ping(data)
  175. table.insert(self.send_data, data)
  176. sys.publish("WEBSOCKET_SEND_DATA", "ping")
  177. end
  178. -- websocket发送文本数据方法
  179. -- @string data: 用户要发送的文本数据
  180. -- @usage self:pone("hello")
  181. local function pong(ws, data)
  182. ws:sendFrame(true, 0xA, data)
  183. end
  184. function ws:pong(data)
  185. self:sendFrame(true, 0xA, data)
  186. end
  187. -- 处理 websocket 发过来的数据并解析帧数据
  188. -- @return string : 返回解析后的单帧用户数据
  189. function ws:recvFrame()
  190. local close_ctrl = "EXIT_TASK" .. self.io.id
  191. local r, s, p = self.io:recv(60000, "WEBSOCKET_SEND_DATA")
  192. if not r then
  193. if s == "timeout" then
  194. return false, nil, "WEBSOCKET_OK"
  195. elseif s == "WEBSOCKET_SEND_DATA" then
  196. if p == "send" then
  197. local send_data = table.concat(self.send_data)
  198. local send_text = self.send_text
  199. self.send_data = {}
  200. self.send_text = nil
  201. send(self, send_data, send_text)
  202. elseif p == "ping" then
  203. local send_data = table.concat(self.send_data)
  204. self.send_data = {}
  205. ping(self, send_data)
  206. elseif p == "pong" then
  207. local send_data = table.concat(self.send_data)
  208. self.send_data = {}
  209. pong(self, send_data)
  210. elseif p == close_ctrl then
  211. return false, nil, close_ctrl
  212. end
  213. return false, nil, "WEBSOCKET_OK"
  214. else
  215. return false, nil, "Read byte error!"
  216. end
  217. end
  218. if #self.input ~= 0 then
  219. s = self.input .. s
  220. end
  221. local _, firstByte, secondByte = pack.unpack(s:sub(1, 2), "bb")
  222. local fin = bit.band(firstByte, 0x80) ~= 0
  223. local rsv = bit.band(firstByte, 0x70) ~= 0
  224. local opcode = bit.band(firstByte, 0x0f)
  225. local isControl = bit.band(opcode, 0x08) ~= 0
  226. -- 检查RSV1,RSV2,RSV3 是否为0,客户端不支持扩展
  227. if rsv then
  228. return false, nil, "服务器正在使用未定义的扩展!"
  229. end
  230. -- 检查数据是否存在掩码加密
  231. local maskbit = bit.band(secondByte, 0x80) ~= 0
  232. local length = bit.band(secondByte, 0x7f)
  233. if isControl and (length >= 126 or not fin) then
  234. return false, nil, "控制帧异常!"
  235. end
  236. if maskbit then
  237. return false, nil, "数据帧被掩码处理过!"
  238. end
  239. -- 获取载荷长度
  240. if length == 126 then
  241. -- if not r then return false, nil, "读取帧载荷长度失败!" end
  242. _, length = pack.unpack(s:sub(3, 4), ">H")
  243. elseif length == 127 then
  244. return false, nil, "数据帧长度超过支持范围!"
  245. end
  246. -- 获取有效载荷数据
  247. if length > 0 then
  248. log.info("teste", #s, length)
  249. if length > 126 then
  250. -- r, s = self.io:recv()
  251. if #s < length + 4 then
  252. self.input = s
  253. return true, false, ""
  254. end
  255. s = s:sub(5, 5 + length - 1)
  256. else
  257. s = s:sub(3, 3 + length - 1)
  258. end
  259. log.info("s的长度", #s, length)
  260. -- if not r then return false, nil, "读取帧有效载荷数据失败!" end
  261. end
  262. -- 处理切片帧
  263. if not fin then -- 切片未完成
  264. return true, false, s
  265. else -- 未分片帧
  266. if opcode < 0x3 then -- 数据帧
  267. self.input = ""
  268. return true, true, s
  269. elseif opcode == 0x8 then -- close
  270. local code, reason
  271. if #s >= 2 then
  272. _, code = pack.unpack(s:sub(1, 2), ">H")
  273. end
  274. if #s > 2 then
  275. reason = s:sub(3)
  276. end
  277. self.terminated = true
  278. -- self:close(code, reason)
  279. self.input = ""
  280. return false, nil, reason
  281. elseif opcode == 0x9 then -- Ping
  282. self:pong(s)
  283. elseif opcode == 0xA then -- Pong
  284. if self.callbacks.pong then
  285. self.callbacks.pong(s)
  286. end
  287. end
  288. self.input = ""
  289. return true, true, nil
  290. end
  291. end
  292. --- 处理 websocket 发过来的数据并拼包
  293. -- @return result, boolean: 返回数据的状态 true 为正常, false 为失败
  294. -- @return data, string: result为true时为数据,false时为报错信息
  295. -- @usage local result, data = ws:recv()
  296. function ws:recv()
  297. local data = ""
  298. while true do
  299. local success, final, message = self:recvFrame()
  300. -- 数据帧解析错误
  301. if not success then
  302. return success, message
  303. end
  304. -- 数据帧分片处理
  305. if message then
  306. data = data .. message
  307. else
  308. data = "" -- 数据帧包含控制帧处理
  309. end
  310. -- 数据帧处理完成
  311. if final and message then
  312. break
  313. end
  314. end
  315. if self.callbacks.message then
  316. self.callbacks.message(data)
  317. end
  318. return true, data
  319. end
  320. --- 关闭 websocket 与服务器的链接
  321. -- @number code 1000或1002等,请参考websocket标准
  322. -- @string reason 关闭原因
  323. -- @return nil
  324. -- @usage ws:close()
  325. -- @usage ws:close(1002,"协议错误")
  326. function ws:close(code, reason)
  327. -- 1000 "normal closure" status code
  328. self.readyState = "CLOSING"
  329. if self.terminated then
  330. log.error("ws:close server code:", code, reason)
  331. elseif self.io.connected then
  332. if code == nil and reason ~= nil then
  333. code = 1000
  334. end
  335. local data = ""
  336. if code ~= nil then
  337. data = pack.pack(">H", code)
  338. end
  339. if reason ~= nil then
  340. data = data .. reason
  341. end
  342. self.terminated = true
  343. self:sendFrame(true, 0x8, data)
  344. end
  345. self.io:close()
  346. self.readyState, self.connected = "CLOSED", false
  347. if self.callbacks.close then
  348. self.callbacks.close(code or 1001)
  349. end
  350. self.input = ""
  351. end
  352. --- 主动退出一个指定的websocket任务
  353. -- @传入一个websocket对象
  354. -- @return nil
  355. -- @usage wesocket.exit(ws)
  356. function exit(ws)
  357. sys.publish("WEBSOCKET_SEND_DATA", "EXIT_TASK" .. ws.io.id)
  358. end
  359. --- 获取websocket当前状态
  360. -- @return string,状态值("CONNECTING","OPEN","CLOSING","CLOSED")
  361. -- @usage ws:state()
  362. function ws:state()
  363. return self.readyState
  364. end
  365. --- 获取websocket与服务器连接状态
  366. -- @return boolean: true 连接成功,其他值连接失败
  367. -- @usage ws:online()
  368. function ws:online()
  369. return self.connected
  370. end
  371. --- websocket 需要在任务中启动,带自动重连,支持心跳协议
  372. -- @number[opt=nil] keepAlive websocket心跳包,建议180秒
  373. -- @function[opt=nil] proc 处理服务器下发消息的函数
  374. -- @number[opt=1000] reconnTime 断开链接后的重连时间
  375. -- @return nil
  376. -- @usage sys.taskInit(ws.start,ws,180)
  377. -- @usage sys.taskInit(ws.start,ws,180,function(msg)u1:send(msg) end)
  378. function ws:start(keepAlive, proc, reconnTime)
  379. reconnTime = tonumber(reconnTime) and reconnTime * 1000 or 1000
  380. if tonumber(keepAlive) then
  381. keepAlivetimer = sys.timerLoopStart(self.ping, keepAlive * 1000, self, "heart")
  382. end
  383. while true do
  384. while not socket.isReady() do
  385. sys.wait(1000)
  386. end
  387. if self:connect() then
  388. if self.open_callback == true then
  389. self.callbacks.open()
  390. self.open_callback = false
  391. end
  392. local close_ctrl = "EXIT_TASK" .. self.io.id
  393. repeat
  394. local r, message = self:recv()
  395. if r then
  396. if type(proc) == "function" then
  397. proc(message)
  398. end
  399. elseif message == close_ctrl then
  400. self:close()
  401. sys.timerStop(keepAlivetimer)
  402. if self.io.id ~= nil then
  403. self = nil
  404. end
  405. return true
  406. elseif not r and message ~= "WEBSOCKET_OK" then
  407. log.error('ws recv error', message)
  408. end
  409. until not r and message ~= "WEBSOCKET_OK"
  410. end
  411. self:close()
  412. log.info("websocket:Start", "与 websocket Server 的连接已断开!")
  413. sys.wait(reconnTime)
  414. end
  415. end