如何在Linux Shell中读取WebSocket响应

curl 请求的方式 不在有效 经常抛错

curl 'wss://ws.test.com' -H 'Upgrade: websocket' -H 'Connection: Upgrade' --verbose
* Protocol "wss" not supported or disabled in libcurl
* Closing connection -1
curl: (1) Protocol "wss" not supported or disabled in libcurl

更换使用 wscat

npm install -g wscat
/usr/bin/wscat -> /usr/lib/node_modules/wscat/bin/wscat
npm WARN notsup Unsupported engine for commander@9.5.0: wanted: {"node":"^12.20.0 || >=14"} (current: {"node":"8.17.0","npm":"6.13.4"})
npm WARN notsup Not compatible with your version of node/npm: commander@9.5.0
npm WARN notsup Unsupported engine for ws@8.16.0: wanted: {"node":">=10.0.0"} (current: {"node":"8.17.0","npm":"6.13.4"})
npm WARN notsup Not compatible with your version of node/npm: ws@8.16.0

+ wscat@5.2.0
added 9 packages from 7 contributors in 1.264s

指令测试

wscat -c 'wss://ws.test.com'
Connected (press CTRL+C to quit)

起码可以测试下 websocket 是否握手成功

WebSocket 是一种支持双向通信的网络协议。

双向通信:客户端(比如浏览器)可以向服务端发送消息,服务端也可以主动向客户端发送消息。
这样就实现了客户端和服务端的双向通信,那么上面所说的消息推送就比较容易实现了。

原先的 HTTP1.0/1.1 只能是客户端向服务端发送消息。

协议特点:

  • 建立在 TCP 协议之上。
  • WebSocket 协议是从 HTTP 协议升级而来。
  • 与 HTTP 协议良好兼容新。默认端口是 80 和 443,握手阶段采用 HTTP 协议。
  • 数据格式比较轻量,通信效率高,性能开销小。
  • 可以发送文本,也可以发送二进制数据。
  • 没有同源限制,客户端可以与任意服务端通信。
  • 协议标识符是 ws(如果加密,则为 wss),服务器网址就是 URL。
  • 可以支持扩展,定了扩展协议。
  • 保持连接状态,websocket 是一种有状态的协议,通信就可以省略部分状态信息。
  • 实时性更强,因为是双向通信协议,所以服务端可以随时向客户端发送数据。

WebSocket 数据交换

数据帧格式

WebSocket 协议中,客户端与服务端数据交换的最小信息单位叫做帧(frame),由 1 个或多个帧按照次序组成一条完整的消息(message)。

数据传输的格式是由 ABNF 来描述的。

WebSocket 数据帧的统一格式如下图:

0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|     Extended payload length continued, if payload len == 127  |
+ - - - - - - - - - - - - - - - +-------------------------------+
|                               |Masking-key, if MASK set to 1  |
+-------------------------------+-------------------------------+
| Masking-key (continued)       |          Payload Data         |
+-------------------------------- - - - - - - - - - - - - - - - +
:                     Payload Data continued ...                :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|                     Payload Data continued ...                |
+---------------------------------------------------------------+

https://www.rfc-editor.org/rfc/rfc6455.html#section-5.2 Base Framing Protocol)

上面图中名词解释:

名词说明大小
FIN如果是 1,表示这是消息(message)的最后一个分片(fragment);如果是 0,表示不是是消息(message)的最后一个分片(fragment)1 个比特
RSV1, RSV2, RSV3一般情况下全为 0。当客户端、服务端协商采用 WebSocket 扩展时,这三个标志位可以非 0,且值的含义由扩展进行定义。如果出现非零的值,且并没有采用 WebSocket 扩展,连接出错各占 1 个比特
opcode操作代码,Opcode 的值决定了应该如何解析后续的数据载荷(data payload)。如果操作代码是不认识的,那么接收端应该断开连接(fail the connection)4 个比特
mask表示是否要对数据载荷进行掩码操作。从客户端向服务端发送数据时,需要对数据进行掩码操作;从服务端向客户端发送数据时,不需要对数据进行掩码操作。
如果服务端接收到的数据没有进行过掩码操作,服务端需要断开连接。
如果 Mask 是 1,那么在 Masking-key 中会定义一个掩码键(masking key),并用这个掩码键来对数据载荷进行反掩码。所有客户端发送到服务端的数据帧,Mask 都是 1。
1 个比特
Payload length数据载荷的长度,单位是字节。假设数 Payload length === x,如果:
x 为 0~126:数据的长度为 x 字节。
x 为 126:后续 2 个字节代表一个 16 位的无符号整数,该无符号整数的值为数据的长度。
x 为 127:后续 8 个字节代表一个 64 位的无符号整数(最高位为 0),该无符号整数的值为数据的长度。
此外,如果 payload length 占用了多个字节的话,payload length 的二进制表达采用网络序(big endian,重要的位在前)。
为 7 位,或 7+16 位,或 1+64 位。
Masking-key所有从客户端传送到服务端的数据帧,数据载荷都进行了掩码操作,Mask 为 1,且携带了 4 字节的 Masking-key。如果 Mask 为 0,则没有 Masking-key。
备注:载荷数据的长度,不包括 mask key 的长度。
0 或 4 字节(32 位
Payload data载荷数据:包括了扩展数据、应用数据。其中,扩展数据 x 字节,应用数据 y 字节。The "Payload data" is defined as "Extension data" concatenated with "Application data".
扩展数据:如果没有协商使用扩展的话,扩展数据数据为 0 字节。所有的扩展都必须声明扩展数据的长度,或者可以如何计算出扩展数据的长度。此外,扩展如何使用必须在握手阶段就协商好。如果扩展数据存在,那么载荷数据长度必须将扩展数据的长度包含在内。
应用数据:任意的应用数据,在扩展数据之后(如果存在扩展数据),占据了数据帧剩余的位置。载荷数据长度 减去 扩展数据长度,就得到应用数据的长度。
(x+y) 字节

表中 opcode 操作码:

  • %x0:表示一个延续帧(continuation frame)。当 Opcode 为 0 时,表示本次数据传输采用了数据分片,当前收到的数据帧为其中一个数据分片。
  • %x1:表示这是一个文本帧(frame),text frame
  • %x2:表示这是一个二进制帧(frame),binary frame
  • %x3-7:保留的操作代码,用于后续定义的非控制帧。
  • %x8:表示连接断开。connection close
  • %x9:表示这是一个 ping 操作。a ping
  • %xA:表示这是一个 pong 操作。a pong
  • %xB-F:保留的操作代码,用于后续定义的控制帧。

数据帧另外一种表达方式

 ws-frame                = frame-fin           ; 1 bit in length
                              frame-rsv1          ; 1 bit in length
                              frame-rsv2          ; 1 bit in length
                              frame-rsv3          ; 1 bit in length
                              frame-opcode        ; 4 bits in length
                              frame-masked        ; 1 bit in length
                              frame-payload-length   ; either 7, 7+16,
                                                     ; or 7+64 bits in
                                                     ; length
                              [ frame-masking-key ]  ; 32 bits in length
                              frame-payload-data     ; n*8 bits in
                                                     ; length, where
                                                     ; n >= 0

    frame-fin               = %x0 ; more frames of this message follow
                            / %x1 ; final frame of this message
                                  ; 1 bit in length

    frame-rsv1              = %x0 / %x1
                              ; 1 bit in length, MUST be 0 unless
                              ; negotiated otherwise

    frame-rsv2              = %x0 / %x1
                              ; 1 bit in length, MUST be 0 unless
                              ; negotiated otherwise

    frame-rsv3              = %x0 / %x1
                              ; 1 bit in length, MUST be 0 unless
                              ; negotiated otherwise

    frame-opcode            = frame-opcode-non-control /
                              frame-opcode-control /
                              frame-opcode-cont

    frame-opcode-cont       = %x0 ; frame continuation

    frame-opcode-non-control= %x1 ; text frame
                            / %x2 ; binary frame
                            / %x3-7
                            ; 4 bits in length,
                            ; reserved for further non-control frames

    frame-opcode-control    = %x8 ; connection close
                            / %x9 ; ping
                            / %xA ; pong
                            / %xB-F ; reserved for further control
                                    ; frames
                                    ; 4 bits in length
                                    
    frame-masked            = %x0
                            ; frame is not masked, no frame-masking-key
                            / %x1
                            ; frame is masked, frame-masking-key present
                            ; 1 bit in length

    frame-payload-length    = ( %x00-7D )
                            / ( %x7E frame-payload-length-16 )
                            / ( %x7F frame-payload-length-63 )
                            ; 7, 7+16, or 7+64 bits in length,
                            ; respectively

    frame-payload-length-16 = %x0000-FFFF ; 16 bits in length

    frame-payload-length-63 = %x0000000000000000-7FFFFFFFFFFFFFFF
                            ; 64 bits in length

    frame-masking-key       = 4( %x00-FF )
                              ; present only if frame-masked is 1
                              ; 32 bits in length

    frame-payload-data      = (frame-masked-extension-data
                               frame-masked-application-data)
                            ; when frame-masked is 1
                              / (frame-unmasked-extension-data
                                frame-unmasked-application-data)
                            ; when frame-masked is 0

    frame-masked-extension-data     = *( %x00-FF )
                            ; reserved for future extensibility
                            ; n*8 bits in length, where n >= 0

    frame-masked-application-data   = *( %x00-FF )
                            ; n*8 bits in length, where n >= 0

    frame-unmasked-extension-data   = *( %x00-FF )
                            ; reserved for future extensibility
                            ; n*8 bits in length, where n >= 0

    frame-unmasked-application-data = *( %x00-FF )
                            ; n*8 bits in length, where n >= 0

客户端到服务端的掩码算法

https://www.rfc-editor.org/rfc/rfc6455.html#section-5.3 Client-to-Server Masking

掩码键(Masking-key)是由客户端挑选出来的 32 位的随机数。掩码操作不会影响数据载荷的长度。掩码、反掩码操作都采用如下算法:

举例说明:

Octet i of the transformed data ("transformed-octet-i") is the XOR of
octet i of the original data ("original-octet-i") with octet at index
i modulo 4 of the masking key ("masking-key-octet-j"):

     j                   = i MOD 4
     transformed-octet-i = original-octet-i XOR masking-key-octet-j
original-octet-i:为原始数据的第 i 字节。
transformed-octet-i:为转换后的数据的第 i 字节。
j:为i mod 4的结果。
masking-key-octet-j:为 mask key 第 j 字节。

算法描述为: original-octet-imasking-key-octet-j 异或后,得到 transformed-octet-i

j  = i MOD 4
transformed-octet-i = original-octet-i XOR masking-key-octet-j

数据分片

分片的目的:

  • 有了消息分片,发送一个消息的时候,就可以发送未知大小的信息。如果消息不能被分片,那么就不得不缓冲整个消息,以便计算长度。而有了分片就可以选择合适大小缓冲区来缓冲分片。
  • 第二个目的是可以使用多路复用。

WebSocket 的每条消息(message)可能被切分为多个数据帧。

当 WebSocket 的接收方接收到一个数据帧时,会根据 FIN 值来判断是否收到消息的最后一个数据帧。

从上图可以看出,FIN = 1 时,表示为消息的最后一个数据帧;FIN = 0 时,则不是消息的最后一个数据帧,接收方还要继续监听接收剩余数据帧。

opcode 表示数据传输的类型,0x01 表示文本类型的数据;0x02 表示二进制类型的数据;0x00 比较特殊,表示延续帧(continuation frame),意思就是完整数据对应的数据帧还没有接收完。

更多分片内容请看这里:https://www.rfc-editor.org/rfc/rfc6455.html#section-5.4

消息分片example:

Client: FIN=1, opcode=0x1, msg="hello"
Server: (process complete message immediately) Hi.
Client: FIN=0, opcode=0x1, msg="and a"
Server: (listening, new message containing text started)
Client: FIN=0, opcode=0x0, msg="happy new"
Server: (listening, payload concatenated to previous message)
Client: FIN=1, opcode=0x0, msg="year!"
Server: (process complete message) Happy new year to you too!
(具体例子见:https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API/Writing_WebSocket_servers)

五:怎么保持连接

在第二小结中我们介绍了 websocket 的特点,其中有一个是保持连接状态。

websocket 是建立在 tcp 之上,那也就是客户端与服务端的 tcp 通道要保持连接不断开。

怎么保持呢?可以用心跳来实现。

其实 websocket 协议早就想到了,它的帧数据格式中有一个字段 opcode,定义了 2 种类型操作, pingpongopcode 分别是 0x90xA

说明:对于长时间没有数据往来的连接,如果依旧长时间保持连接的状态,那么就会浪费连接资源。

[完]

六、参考

问题描述 :

观察客户端 每2分钟 websocket 断开连接, 提示 1006 CLOSE_ABNORMAL.

排查问题 :

服务端观察日志能发现的确与客户端同一时间断开连接. 但服务端onClose事件并不是服务端主动要求断开的.
客户端也有6秒 定时发ping包, 双方心跳检测都不是超时问题.
知道运维同学 帮忙查了下, 使用cloudflare CDN的新域名观察后并没有这个问题.
那就破案了,现有的域名CND用的都是Google Cloud , 运维同学咨询了谷歌售后服务. 回答的都绝了

聊天内容如下 :

> $1 : 关于负载均衡的问题,,用户 -> 负载均衡 -> nginx,https的服务没有问题,websocket的服务2分钟后就断了

> $1 : timeout设置为999999999

> $2 : 这是有哪些地方需要单独设置吗
> $1 : 对
> $2 : 那会影响https的服务吗
> $1 : 对
> $2 : 把websocket单独令出来

上面对话简单来说就是 我们webscoket服务用的谷歌云的负载均衡,又一个超时配置. 120秒没有请求过来负载机切换到其他服务器了. 就算后面是单机他也会断开. 调整这个问题很简单,单独配置websocket的负载. 调大超时秒数.

谷歌云的负载均衡超时仅依赖于短链接请求. 并不适用于长连接, 只能通过加超长的timeout 解决问题

重要信息错误状态码

WebSocket断开时,会触发CloseEvent, CloseEvent会在连接关闭时发送给使用 WebSockets 的客户端. 它在 WebSocket 对象的 onclose 事件监听器中使用。CloseEvent的code字段表示了WebSocket断开的原因。可以从该字段中分析断开的原因。

CloseEvent有三个字段需要注意, 通过分析这三个字段,一般就可以找到断开原因

CloseEvent.code: code是错误码,是整数类型
CloseEvent.reason: reason是断开原因,是字符串
CloseEvent.wasClean: wasClean表示是否正常断开,是布尔值。一般异常断开时,该值为false


一般来说1006的错误码出现的情况比较常见,该错误码一般出现在断网时。

状态码名称描述
0–999 保留段, 未使用.
1000CLOSE_NORMAL正常关闭; 无论为何目的而创建, 该链接都已成功完成任务.
1001CLOSE_GOING_AWAY终端离开, 可能因为服务端错误, 也可能因为浏览器正从打开连接的页面跳转离开.
1002CLOSE_PROTOCOL_ERROR由于协议错误而中断连接.
1003CLOSE_UNSUPPORTED由于接收到不允许的数据类型而断开连接 (如仅接收文本数据的终端接收到了二进制数据).
1004 保留. 其意义可能会在未来定义.
1005CLOSE_NO_STATUS保留. 表示没有收到预期的状态码.
1006CLOSE_ABNORMAL保留. 用于期望收到状态码时连接非正常关闭 (也就是说, 没有发送关闭帧).
1007Unsupported Data由于收到了格式不符的数据而断开连接 (如文本消息中包含了非 UTF-8 数据).
1008Policy Violation由于收到不符合约定的数据而断开连接. 这是一个通用状态码, 用于不适合使用 1003 和 1009 状态码的场景.
1009CLOSE_TOO_LARGE由于收到过大的数据帧而断开连接.
1010Missing Extension客户端期望服务器商定一个或多个拓展, 但服务器没有处理, 因此客户端断开连接.
1011Internal Error客户端由于遇到没有预料的情况阻止其完成请求, 因此服务端断开连接.
1012Service Restart服务器由于重启而断开连接.
1013Try Again Later服务器由于临时原因断开连接, 如服务器过载因此断开一部分客户端连接.
1014 由 WebSocket标准保留以便未来使用.
1015TLS Handshake保留. 表示连接由于无法完成 TLS 握手而关闭 (例如无法验证服务器证书).
1016–1999 由 WebSocket标准保留以便未来使用.
2000–2999 由 WebSocket拓展保留使用.
3000–3999 可以由库或框架使用.? 不应由应用使用. 可以在 IANA 注册, 先到先得.
4000–4999 可以由应用使用.

WebSocket是什么

in 协议

WebSocket:实时双向通信的协议

WebSocket是一种在Web应用程序中实现实时双向通信的协议。它提供了一种持久化的连接,允许客户端和服务器之间进行全双工通信,从而实现实时数据交换。本文将介绍WebSocket的基本原理、优势以及在Web开发中的应用。

1. WebSocket的基本原理

WebSocket协议建立在传输层协议(如TCP)之上,通过在客户端和服务器之间建立一条持久化的连接,实现双向通信。与传统的HTTP请求-响应模型不同,WebSocket允许服务器主动向客户端发送数据,而不需要客户端发起请求。

在建立WebSocket连接时,客户端和服务器通过进行一次特殊的握手过程来协商连接的建立。一旦连接建立成功,双方就可以通过发送消息来进行实时的双向通信。这种持久化的连接减少了通信的延迟,并且可以更高效地传输实时数据。

2. WebSocket的优势

相比传统的HTTP请求-响应模型,WebSocket具有以下优势:

  • 实时性和效率:WebSocket提供了实时的双向通信,服务器可以主动向客户端推送数据,避免了频繁的轮询或长轮询的开销,提供更高效的数据传输和实时性。
  • 更少的通信开销:由于WebSocket建立了持久化的连接,避免了每次通信都需要建立和关闭连接的开销,减少了通信的开销和网络流量。
  • 更低的延迟:WebSocket通过减少通信的延迟,实现了更低的响应时间,使得实时应用程序(如聊天应用、股票行情等)能够更快地响应和更新数据。

3. WebSocket的应用

WebSocket在Web开发中有广泛的应用,特别是在需要实时数据交换的场景。以下是一些常见的应用领域:

  • 实时聊天和通信:通过WebSocket可以实现实时聊天和通信应用,允许用户即时发送和接收消息,提供更好的用户体验。
  • 实时数据更新:WebSocket可以用于实时更新数据,例如股票行情、实时地图、协作编辑等应用,使得用户可以及时获得最新的数据。
  • 实时游戏:WebSocket可以用于实时多人游戏,实现实时的游戏状态同步和交互。

参考文献

以下是一些相关的参考文献,供进一步学习和了解:

  1. I. Fette, A. Melnikov, "The WebSocket Protocol," RFC 6455, 2011. [Online]. Available: https://datatracker.ietf.org/doc/html/rfc6455.
  2. M. Sambasivan, "WebSockets vs. HTTP/2: A Performance Comparison," 2019. [Online]. Available: https://www.section.io/engineering-education/websockets-vs-http2-performance-comparison/.

结论

WebSocket是一种在Web应用程序中实现实时双向通信的协议。它通过建立持久化的连接,允许服务器主动向客户端推送数据,提供了实时性、效率和更低的延迟。WebSocket在实时聊天、实时数据更新以及实时游戏等应用中有着广泛的应用。

看到有同学在 Webscocket pong 包 php 踩坑实例 文章中留言 要我给出Demo
满足这位同学的要求:

下面Demo使用的 Saber 人性化的协程HTTP客户端封装库
运行环境 swoole4.2+ php7.1+ 需要使用协程 go();

demo 并不全 但坑都是踩过的 getSendMessage 创建订阅数据json格式这个就不提供了, 很简单没坑, 但最好控制好订阅交易对数量.

币安的webscocket 主要坑在于 ping包 非常正规的使用opcode是9, 并且收包中的data是空包, 回pongopcode传10.

/**
 * 币安现货ws 订阅
 * @param array $symbolList 订阅交易对列表
 * @param $subType 订阅类型
 * @return bool
 */
public function binanceSubscribe(array $symbolList, $subType = self::SUB_SPOT_DEPTH_TYPE)
{
    // 订阅地址
    $this->stream = 'wss://stream.binance.com:443/stream?streams=';

    $startTime = time();

    try {
        // 判断连接是否为空
        if (!empty($this->websocket)) {
            // 关闭连接
            @$this->websocket->close();
        }

        if (empty($this->websocket)) {

            $this->websocket = SaberGM::websocket($this->stream);

            // 创建订阅数据 json 数据
            $sendMessage = $this->getSendMessage($symbolList, $subType);

            // 推送订阅内容
            $this->websocket->push($sendMessage);
        }

        // 容错
        if ($this->websocket == null) {
            return false;
        }

        $errCount = 0;
        do {
            // 诶币安比较乖 莫名其妙就断包了 , 还是10分钟一重连比较好
            if (($startTime + 600) < time()) {

                Logger::wss(['超时10分钟' => $startTime, 'send' => $sendMessage]);
                return false;
            }

            // 超时 1秒抛出空包
            $data = $this->websocket->recv(1);  // timeout

            // 收包数据结构对象类型 想要存储仅能通过 serialize
            Logger::recv(['s' => serialize($data)]);

            $errCount++;

            // 空包容错 5次 (空包或超时过多最好重新连接)
            if ($errCount > 5) return false;

            // 币安对于opcode 还是有要求的
            if (empty($data->opcode)) {

                Logger::wss(['超时' => serialize($data)]);

                continue;
            } else if ($data->opcode == 9) {

                // pong 币安的ping包 opcode=9 且是data是空 我们需要 回 pong 且opcode=10
                Logger::pong(['s' => serialize($data)]);
                $this->websocket->push('pong', 10, true);

                continue;
            }

            // 空包
            if (empty($data->data)) continue;

            $responseData = $data->data;
            $finish = $data->finish;

            if (!$finish || !$responseData) continue;

            Logger::wsDepthJson(['opcode' => $data->opcode, 'fd' => $data->fd, 's' => serialize($data)]);

            //同步处理
            $responseData = json_decode($responseData, true);
            // todo  ... 代码逻辑 根据订阅类型分别存储于不同redis 有序集合Key中 , 获取时永远提取最新1秒数据 取不到在通过 api 拿, 币安api延迟太狠了.

            //  计数器归零
            $errCount = 0;

        } while (true);

    } catch (\Swlib\Http\Exception\ConnectException $e) {

        echo 'ConnectException ', $e->getMessage(),PHP_EOL;
        return false;
    } catch (\Throwable $e) {

        echo 'Throwable ', $e->getMessage(),PHP_EOL;

        return false;
    }

    return true;
}

OKEX 的webscocket订阅重要在于 解包 使用 gzinflate 函数实现解包

/**
 * okex现货ws 订阅
 * @param array $symbolList
 * @param $subType
 * @return bool
 */
public function okexSubscribe(array $symbolList, $subType)
{
    $this->stream = 'wss://real.okex.com:8443/ws/v3';

    try {

        if (!empty($this->websocket))
        {
            // 关闭连接
            @$this->websocket->close();
        }

        if (empty($this->websocket))
        {
            // Saber\WebSocket
            $this->websocket = SaberGM::websocket($this->stream);
            $sendMessage = self::getSendMessage($symbolList, $subType);

            $this->websocket->push($sendMessage);
        }

        if ($this->websocket == null)
        {
            Logger::errmsg(['platform' => "ok", 'stream' => $this->stream]);
            return false;
        }

        $errCount = 0;
        do {
            $data = $this->websocket->recv(1);// timeout
            $errCount++;

            if(empty($data->data)) return false;
            $responseData = gzinflate($data->data);

            if ($errCount > 5) return false;
            if (empty($responseData)) return false;

            $finish = $data->finish;
            if (!$finish || !$responseData) return false;

            //同步处理
            $responseData = json_decode($responseData, true);
           
           // todo ... code

            //  计数器归零
            $errCount = 0;

        } while (true);

    } catch (\Swlib\Http\Exception\ConnectException $e) {

        $this->websocket = null;
    } catch (\Exception $e) {

        $this->websocket = null;
    }

    return true;
}

火币坑位也是在于解包 gzip解压, 感谢开发同学的钻研, 为我大PHP在币圈量化提供轮子.

/**
 * 火币现货ws 订阅
 * @param array $symbolList
 * @param string $subType 订阅类型
 * @return bool
 */
public function huobiSubscribe(array $symbolList, $subType)
{
    $this->stream = 'wss://api.huobi.pro:443/ws';

    try {

        if (!empty($this->websocket)) {
            // 关闭连接
            @$this->websocket->close();
        }

        if (empty($this->websocket)) {
            // Saber\WebSocket
            $this->websocket = SaberGM::websocket($this->stream);

            foreach ($symbolList as $sym) {
                $huoBiSymbol = strtolower(implode('', explode('_', $sym)));
                $sendMessage = $this->getSendMessage($huoBiSymbol, $subType, $sym);
                $this->websocket->push($sendMessage);
            }
        }

        if ($this->websocket == null) {
            Logger::errmsg(['platform' => "huobi", 'stream' => $this->stream]);
            return false;
        }

        $errCount = 0;
        do {

            $data = $this->websocket->recv(1);// timeout'

            $errCount++;

            //容错5次
            if ($errCount > 5) return false;

            if (empty($data->finish)) continue;

            $responseData = json_decode($this->gzdecode($data), true);

            //接受到ping 恢复pong  {"ping": 1492420473027}   {"pong": 1492420473027}
            if (isset($responseData['ping'])) {
                $this->websocket->push($this->getPong($responseData['ping']));
            }

            if (!$responseData) return false;

            // todo ...code

            // 计数器归零
            $errCount = 0;

        } while (true);

    } catch (\Swlib\Http\Exception\ConnectException $e) {

        $this->websocket = null;
    } catch (\Throwable $e) {

        $this->websocket = null;
    }

    return true;
}

/**
 * gzip解压
 * @param $data
 * @return false|string
 */
public function gzdecode($data)
{
    $flags = ord(substr($data, 3, 1));
    $headerlen = 10;
    if ($flags & 4) {
        $extralen = unpack('v', substr($data, 10, 2));
        $extralen = $extralen[1];
        $headerlen += 2 + $extralen;
    }
    
    if ($flags & 8) // Filename
        $headerlen = strpos($data, chr(0), $headerlen) + 1;
    if ($flags & 16) // Comment
        $headerlen = strpos($data, chr(0), $headerlen) + 1;
    if ($flags & 2) // CRC at end of file
        $headerlen += 2;
    $unpacked = @gzinflate(substr($data, $headerlen));
    if ($unpacked === FALSE) $unpacked = $data;
    
    return $unpacked;
}

/**
 * 发送pong信息
 * @param $pingInt
 * @return false|string
 */
public static function getPong($pingInt)
{
    return json_encode(["pong" => $pingInt], JSON_UNESCAPED_UNICODE);
}

好了 这篇文章 demo 就写着点, 部分内容没全写进去, 如果你不是自己踩坑就想直接使用,我建议你还是不要用我的demo了.
伸手党的同学也不要发评论要demo. 本文仅帮助广大PHP同行 不重复入坑.

karp

创建我自己的巨人