| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426 |
- --- 模块功能:websocket客户端
- -- @module websocket
- -- @author OpenLuat
- -- @license MIT
- -- @copyright OpenLuat.com
- -- @release 2021.04.08
- require "utils"
- require "socket"
- module(..., package.seeall)
- local magic = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
- local ws = {}
- ws.__index = ws
- local function websocket(url, cert)
- return setmetatable({
- io = nil,
- url = url,
- key = "",
- wss = "",
- cert = cert,
- host = "",
- port = "",
- input = "",
- callbacks = {},
- send_data = {},
- send_text = nil,
- sendsize = 1460,
- open_callback = false,
- connected = false,
- terminated = false,
- readyState = "CONNECTING"
- }, ws)
- end
- --- 创建 websocket 对象
- -- @string url websocket服务器的连接地址,格式为ws(或wss)://xxx开头
- -- @table[opt=nil] cert ssl连接需要的证书配置,cert格式如下:
- -- {
- -- caCert = "ca.crt", --CA证书文件(Base64编码 X.509格式),如果存在此参数,则表示客户端会对服务器的证书进行校验;不存在则不校验
- -- clientCert = "client.crt", --客户端证书文件(Base64编码 X.509格式),服务器对客户端的证书进行校验时会用到此参数
- -- clientKey = "client.key", --客户端私钥文件(Base64编码 X.509格式)
- -- clientPassword = "123456", --客户端证书文件密码[可选]
- -- insist = 1, --证书中的域名校验失败时,是否坚持连接,默认为1,坚持连接,0为不连接
- -- }
- -- @return table 返回1个websocket对象
- -- @usage local ws = websocket.new("ws://121.40.165.18:8800")
- function new(url, cert)
- return websocket(url, cert)
- end
- --- ws:on 注册函数
- -- @string event 事件,可选值"open","message","close","error","pong"
- -- @function callback 回调方法,message|error|pong形参是该方法需要的数据。
- -- @usage mt:on("message",function(message) local print(message)end)
- function ws:on(event, callback)
- self.callbacks[event] = callback
- end
- --- websocket 与 websocket 服务器建立连接
- -- @number timeout 与websocket服务器建立连接最长超时
- -- @return bool,true,表示连接成功,false or nil 表示连接失败
- -- @usage while not ws:connect(20000) do sys.wait(2000) end
- function ws:connect(timeout)
- self.wss, self.host, self.port, self.path = self.url:match("(%a+)://([%w%.%-]+):?(%d*)(.*)")
- self.wss, self.host = self.wss:lower(), self.host:lower()
- self.port = self.port ~= "" and self.port or (self.wss == "wss" and 443 or 80)
- if self.wss == "wss" then
- self.io = socket.tcp(true, self.cert)
- else
- self.io = socket.tcp()
- end
- if not self.io then
- log.error("websocket:connect:", "没有可用的TCP通道!")
- return false
- end
- log.info("websocket url:", self.url)
- if not self.io:connect(self.host, self.port, timeout) then
- log.error("websocket:connect", "服务器连接失败!")
- return false
- end
- self.key = crypto.base64_encode(math.random(100000000000000, 999999999999999) .. 0, 16)
- local req = "GET " .. self.path .. " HTTP/1.1\r\nHost: " .. self.host .. ":" .. self.port .. "\r\nConnection: Upgrade\r\nUpgrade: websocket\r\n" .. "Origin: http://" .. self.host ..
- "\r\nSec-WebSocket-Version: 13\r\n" .. "Sec-WebSocket-Key: " .. self.key .. "\r\n\r\n"
- if self.io:send(req, tonumber(timeout) or 20000) then
- local r, s = self.io:recv(tonumber(timeout) or 5000)
- if not r then
- self.io:close()
- log.error("websocket:connect", "与 websocket server 握手超时!")
- return false
- end
- local _, idx, code = s:find("%s(%d+)%s.-\r\n")
- if code == "101" then
- local header, accept = {}, self.key .. magic
- accept = crypto.sha1(accept, #accept):fromHex()
- accept = crypto.base64_encode(accept, #accept)
- for k, v in string.gmatch(s:sub(idx + 1, -1), "(.-):%s*(.-)\r\n") do
- header[k:lower()] = v
- end
- if header["sec-websocket-accept"] and header["sec-websocket-accept"] == accept then
- log.info("websocket:connect", "与 websocket server 握手成功!")
- self.connected, self.readyState = true, "OPEN"
- if self.callbacks.open then
- self.open_callback = true
- end
- return true
- end
- end
- end
- log.error("websocket:connect", "与 websocket server 握手失败!")
- return false
- end
- -- 掩码加密
- -- mask: 4位长度掩码字符串
- -- data: 待加密的字符串
- -- return: 掩码加密后的字符串
- local function wsmask(mask, data)
- local i = 0
- return data:gsub(".", function(c)
- i = i + 1
- return string.char(bit.bxor(data:byte(i), mask:byte((i - 1) % 4 + 1)))
- end)
- end
- --- websocket发送帧方法
- -- @bool fin true表示结束帧,false表示延续帧
- -- @number opcode 0x0--0xF,其他值非法,代码意义参考websocket手册
- -- @string data 用户要发送的数据
- -- @usage self:sendFrame(true, 0x1, "www.openluat.com")
- function ws:sendFrame(fin, opcode, data)
- if not self.connected then
- return
- end
- local finbit, maskbit, len = fin and 0x80 or 0, 0x80, #data
- local frame = pack.pack("b", bit.bor(finbit, opcode))
- if len < 126 then
- frame = frame .. pack.pack("b", bit.bor(len, maskbit))
- elseif len < 0xFFFF then
- frame = frame .. pack.pack(">bH", bit.bor(126, maskbit), len)
- else
- -- frame = frame .. pack.pack(">BL", bit.bor(127, maskbit), len)
- log.error("ws:sendFrame", "数据长度超过最大值!")
- end
- local mask = pack.pack(">I", os.time())
- frame = frame .. mask .. wsmask(mask, data)
- for i = 1, #frame, self.sendsize do
- if not self.io:send(frame:sub(i, i + self.sendsize - 1)) then
- break
- end
- end
- end
- -- websocket 发送用户数据方法
- -- @string data: 用户要发送的字符串数据
- -- @bool text: true 数据为文本字符串,nil或false数据为二进制数据。
- -- @usage self:send("www.openluat.com")
- -- @usage self:send("www.openluat.com",true)
- -- @usage self:send(string.fromHex("www.openluat.com"))
- local function send(ws, data, text)
- if text then
- log.info("websocket cleint send:", data:sub(1, 100))
- ws:sendFrame(true, 0x1, data)
- else
- ws:sendFrame(true, 0x2, data)
- end
- if ws.callbacks.sent then
- ws.callbacks.sent()
- end
- end
- function ws:send(data, text)
- table.insert(self.send_data, data)
- self.send_text = text
- sys.publish("WEBSOCKET_SEND_DATA", "send")
- end
- -- websocket发送ping包
- -- @string data: 用户要发送的文本数据
- -- @usage self:ping("hello")
- local function ping(ws, data)
- ws:sendFrame(true, 0x9, data)
- end
- function ws:ping(data)
- table.insert(self.send_data, data)
- sys.publish("WEBSOCKET_SEND_DATA", "ping")
- end
- -- websocket发送文本数据方法
- -- @string data: 用户要发送的文本数据
- -- @usage self:pone("hello")
- local function pong(ws, data)
- ws:sendFrame(true, 0xA, data)
- end
- function ws:pong(data)
- self:sendFrame(true, 0xA, data)
- end
- -- 处理 websocket 发过来的数据并解析帧数据
- -- @return string : 返回解析后的单帧用户数据
- function ws:recvFrame()
- local close_ctrl = "EXIT_TASK" .. self.io.id
- local r, s, p = self.io:recv(60000, "WEBSOCKET_SEND_DATA")
- if not r then
- if s == "timeout" then
- return false, nil, "WEBSOCKET_OK"
- elseif s == "WEBSOCKET_SEND_DATA" then
- if p == "send" then
- local send_data = table.concat(self.send_data)
- local send_text = self.send_text
- self.send_data = {}
- self.send_text = nil
- send(self, send_data, send_text)
- elseif p == "ping" then
- local send_data = table.concat(self.send_data)
- self.send_data = {}
- ping(self, send_data)
- elseif p == "pong" then
- local send_data = table.concat(self.send_data)
- self.send_data = {}
- pong(self, send_data)
- elseif p == close_ctrl then
- return false, nil, close_ctrl
- end
- return false, nil, "WEBSOCKET_OK"
- else
- return false, nil, "Read byte error!"
- end
- end
- if #self.input ~= 0 then
- s = self.input .. s
- end
- local _, firstByte, secondByte = pack.unpack(s:sub(1, 2), "bb")
- local fin = bit.band(firstByte, 0x80) ~= 0
- local rsv = bit.band(firstByte, 0x70) ~= 0
- local opcode = bit.band(firstByte, 0x0f)
- local isControl = bit.band(opcode, 0x08) ~= 0
- -- 检查RSV1,RSV2,RSV3 是否为0,客户端不支持扩展
- if rsv then
- return false, nil, "服务器正在使用未定义的扩展!"
- end
- -- 检查数据是否存在掩码加密
- local maskbit = bit.band(secondByte, 0x80) ~= 0
- local length = bit.band(secondByte, 0x7f)
- if isControl and (length >= 126 or not fin) then
- return false, nil, "控制帧异常!"
- end
- if maskbit then
- return false, nil, "数据帧被掩码处理过!"
- end
- -- 获取载荷长度
- if length == 126 then
- -- if not r then return false, nil, "读取帧载荷长度失败!" end
- _, length = pack.unpack(s:sub(3, 4), ">H")
- elseif length == 127 then
- return false, nil, "数据帧长度超过支持范围!"
- end
- -- 获取有效载荷数据
- if length > 0 then
- log.info("teste", #s, length)
- if length > 126 then
- -- r, s = self.io:recv()
- if #s < length + 4 then
- self.input = s
- return true, false, ""
- end
- s = s:sub(5, 5 + length - 1)
- else
- s = s:sub(3, 3 + length - 1)
- end
- log.info("s的长度", #s, length)
- -- if not r then return false, nil, "读取帧有效载荷数据失败!" end
- end
- -- 处理切片帧
- if not fin then -- 切片未完成
- return true, false, s
- else -- 未分片帧
- if opcode < 0x3 then -- 数据帧
- self.input = ""
- return true, true, s
- elseif opcode == 0x8 then -- close
- local code, reason
- if #s >= 2 then
- _, code = pack.unpack(s:sub(1, 2), ">H")
- end
- if #s > 2 then
- reason = s:sub(3)
- end
- self.terminated = true
- -- self:close(code, reason)
- self.input = ""
- return false, nil, reason
- elseif opcode == 0x9 then -- Ping
- self:pong(s)
- elseif opcode == 0xA then -- Pong
- if self.callbacks.pong then
- self.callbacks.pong(s)
- end
- end
- self.input = ""
- return true, true, nil
- end
- end
- --- 处理 websocket 发过来的数据并拼包
- -- @return result, boolean: 返回数据的状态 true 为正常, false 为失败
- -- @return data, string: result为true时为数据,false时为报错信息
- -- @usage local result, data = ws:recv()
- function ws:recv()
- local data = ""
- while true do
- local success, final, message = self:recvFrame()
- -- 数据帧解析错误
- if not success then
- return success, message
- end
- -- 数据帧分片处理
- if message then
- data = data .. message
- else
- data = "" -- 数据帧包含控制帧处理
- end
- -- 数据帧处理完成
- if final and message then
- break
- end
- end
- if self.callbacks.message then
- self.callbacks.message(data)
- end
- return true, data
- end
- --- 关闭 websocket 与服务器的链接
- -- @number code 1000或1002等,请参考websocket标准
- -- @string reason 关闭原因
- -- @return nil
- -- @usage ws:close()
- -- @usage ws:close(1002,"协议错误")
- function ws:close(code, reason)
- -- 1000 "normal closure" status code
- self.readyState = "CLOSING"
- if self.terminated then
- log.error("ws:close server code:", code, reason)
- elseif self.io.connected then
- if code == nil and reason ~= nil then
- code = 1000
- end
- local data = ""
- if code ~= nil then
- data = pack.pack(">H", code)
- end
- if reason ~= nil then
- data = data .. reason
- end
- self.terminated = true
- self:sendFrame(true, 0x8, data)
- end
- self.io:close()
- self.readyState, self.connected = "CLOSED", false
- if self.callbacks.close then
- self.callbacks.close(code or 1001)
- end
- self.input = ""
- end
- --- 主动退出一个指定的websocket任务
- -- @传入一个websocket对象
- -- @return nil
- -- @usage wesocket.exit(ws)
- function exit(ws)
- sys.publish("WEBSOCKET_SEND_DATA", "EXIT_TASK" .. ws.io.id)
- end
- --- 获取websocket当前状态
- -- @return string,状态值("CONNECTING","OPEN","CLOSING","CLOSED")
- -- @usage ws:state()
- function ws:state()
- return self.readyState
- end
- --- 获取websocket与服务器连接状态
- -- @return boolean: true 连接成功,其他值连接失败
- -- @usage ws:online()
- function ws:online()
- return self.connected
- end
- --- websocket 需要在任务中启动,带自动重连,支持心跳协议
- -- @number[opt=nil] keepAlive websocket心跳包,建议180秒
- -- @function[opt=nil] proc 处理服务器下发消息的函数
- -- @number[opt=1000] reconnTime 断开链接后的重连时间
- -- @return nil
- -- @usage sys.taskInit(ws.start,ws,180)
- -- @usage sys.taskInit(ws.start,ws,180,function(msg)u1:send(msg) end)
- function ws:start(keepAlive, proc, reconnTime)
- reconnTime = tonumber(reconnTime) and reconnTime * 1000 or 1000
- if tonumber(keepAlive) then
- keepAlivetimer = sys.timerLoopStart(self.ping, keepAlive * 1000, self, "heart")
- end
- while true do
- while not socket.isReady() do
- sys.wait(1000)
- end
- if self:connect() then
- if self.open_callback == true then
- self.callbacks.open()
- self.open_callback = false
- end
- local close_ctrl = "EXIT_TASK" .. self.io.id
- repeat
- local r, message = self:recv()
- if r then
- if type(proc) == "function" then
- proc(message)
- end
- elseif message == close_ctrl then
- self:close()
- sys.timerStop(keepAlivetimer)
- if self.io.id ~= nil then
- self = nil
- end
- return true
- elseif not r and message ~= "WEBSOCKET_OK" then
- log.error('ws recv error', message)
- end
- until not r and message ~= "WEBSOCKET_OK"
- end
- self:close()
- log.info("websocket:Start", "与 websocket Server 的连接已断开!")
- sys.wait(reconnTime)
- end
- end
|