根据websocket协议,在数据包过大的情况下,会把包分块发送。服务端需要把接收到的分块包拼接起来 (相关链接)。

而actix-web-actors的ws模块默认不会自动合并分块的数据包,需要手动进行合并。

我的环境:

TOML
actix = "0.10"
actix-http = "2"
actix-web = { version = "3", features = ["default", "rustls"] }
actix-web-actors = "3"
tokio = { version = "0.2", features = ["full"] }
点击展开查看更多

所有被分块的包并不会被处理为 Message::Text 或 Message::Binary。

而是被处理为 Message::Continuation

RUST
pub enum Message {
    /// Text message
    Text(String),
    /// Binary message
    Binary(Bytes),
    /// Continuation
    Continuation(Item),
    /// Ping message
    Ping(Bytes),
    /// Pong message
    Pong(Bytes),
    /// Close message with optional reason
    Close(Option<CloseReason>),
    /// No-op. Useful for actix-net services
    Nop,
}
pub enum Item {
    FirstText(Bytes),
    FirstBinary(Bytes),
    Continue(Bytes),
    Last(Bytes),
}
点击展开查看更多

其实很简单,分块数据包会按照顺序发送。

先是Item::FirstText或Item::FirstBinary

接着中间的数据包为Item::Continue

分块数据包发送完成的最后一个包Item::Last

只需要在接收到Item::Last以外的包时,保存起来,接收到Item::Last后,把之前接收到的所有包连带Item::Last拼接起来即可。

我的实现:

RUST


pub struct Test {
    frame_buf: Vec<ws::Item>,
}

impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Test {
    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
        match msg {
            Ok(ws::Message::Text(data)) => {
                // 这里继续处理data数据
            }
            Ok(ws::Message::Binary(data)) => {
                // 这里继续处理data数据
            }

            // 接收到分块数据包
            Ok(ws::Message::Continuation(item)) => {
                if let ws::Item::Last(_) = item {
                    // Last包也先写入缓冲区
                    self.frame_buf.push(item);

                    // 开始从缓冲区中按顺序读取所有数据,并拼接起来
                    let mut data = vec![];
                    while !self.frame_buf.is_empty() {
                        data.extend(match self.frame_buf.remove(0) {
                            ws::Item::FirstText(data) => data,
                            ws::Item::FirstBinary(data) => data,
                            ws::Item::Continue(data) => data,
                            ws::Item::Last(data) => data,
                        });
                    }

                    // 这里继续处理data数据
                } else {
                    // 分块数据包不是Last,则把数据写入到缓冲区
                    self.frame_buf.push(item);
                }
            }
            _ => (),
        }
    }
}
点击展开查看更多

版权声明

开始搜索

输入关键词搜索文章内容

↑↓
ESC
⌘K 快捷键