这篇文章上次修改于 768 天前,可能其部分内容已经发生变化,如有疑问可询问作者。

1 QUIC

QUIC 是基于 UDP 的多路复用、安全传输协议。可以简单理解为在用户空间将 TCP 里的机制实现了一遍,比如拥塞控制、流量控制等。好处是升级比较方便,TCP 协议栈是内核中实现的,只能随内核升级,而 QUIC 可灵活升级。

QUIC 还有如下优点:

  • 采用 Connection ID 标识一条连接,而非四元组(src_ip, src_port, dst_ip, dst_port),如果发生网络切换,连接仍然不会断开。
  • 无队头阻塞问题。真正实现多个流传输数据,各个流之间互不影响,即连接内的一个流阻塞不会导致所有流阻塞。
  • 改进的拥塞控制。

    • 目前有两种拥塞控制算法可供选择。
    • 严格单调递增的 Packet 序列,较容易区分是重传还是首次传输。
    • 携带收到数据包和发送 ACK 之间的延迟信息,可以更准确地计算 RTT 时间。
    • 尽可能实现快重传而不是超时重传。在达到超时之前,发送两个 TLPs 触发快重传。
  • 客户端可以保存握手信息,如果没有过期,后续再重连只需要 0RTT。
  • 安全,包含了 TLS。

2 使用

本文采用 rust 中的 quic 库 --- quinn。

概念:

  • Client:发起 QUIC 连接的 Endpoint。
  • Server:接收 QUIC 连接的 Endpoint。
  • Endpoint:终端,可以是客户端,也可以是服务端。
  • Stream:QUIC 连接内有序的单向或双向通道。
  • Connection:是两个 QUIC Endpoint 之间的加密连接,包含多路复用 Stream。

对 quinn 库的封装:

#![cfg(feature = "rustls")]

use bytes::{BufMut, Bytes, BytesMut};
use futures_util::StreamExt;
use protobuf::Message;
use quinn::{ClientConfig, Endpoint, Incoming, IncomingUniStreams, NewConnection, ServerConfig};
use rustls;
use std::{
    fs::File,
    io::{Error, BufReader},
    net::SocketAddr,
    sync::Arc,
    str
};
use sodiumoxide::crypto::secretbox::Key;
use tokio::{self, sync::mpsc};
use crate::{anyhow::anyhow, ResultType, time::Instant, internal_config::RTT_LEN};
pub use quinn;

const SERVER_NAME: &str = "xremote.autox.tech";
const CHANNEL_LATENCY: i64 = 1_000;
const DROP_MSG: bool = true;

pub(crate) struct Cert<'a> {
    ca_file: &'a str,
    client_cert_file: &'a str,
    client_key_file: &'a str,
    server_cert_file: &'a str,
    server_key_file: &'a str,
}

#[cfg(not(debug_assertions))]
lazy_static::lazy_static! {
    static ref CERT: Cert<'static> = Cert {
        ca_file: "/etc/ssl/ca.cert",
        client_cert_file: "/etc/ssl/client.cert",
        client_key_file: "/etc/ssl/client.key",
        server_cert_file: "/etc/ssl/server.fullchain",
        server_key_file: "/etc/ssl/server.rsa",
    };
}

const MAX_BUFFER_SIZE: usize = 128;
type Value = Vec<u8>;
type Sender = mpsc::Sender<(Instant, Value)>;
type Receiver = mpsc::Receiver<(Instant, Value)>;

pub struct Connection {
    pub conn: quinn::Connection,
    pub endpoint: Option<Endpoint>,
    self_sender: Sender,
    out_receiver: Receiver,
}

impl Connection {
    pub async fn new_for_client(
        server_addr:  SocketAddr,
        local_addr:  SocketAddr,
        ms_timeout: u64
    ) -> ResultType<Self> {
        let client_cfg = client::config(CERT.ca_file, CERT.client_cert_file, CERT.client_key_file);
        let mut endpoint = Endpoint::client(local_addr).expect("create client endpoint");
        endpoint.set_default_client_config(client_cfg);

        let connecting = super::timeout(
            ms_timeout,
            endpoint.connect(server_addr, SERVER_NAME).expect("connect to server error")
        ).await??;

        let NewConnection {
            connection,
            uni_streams,
            ..
        } = connecting;

        let (self_sender, self_receiver) = mpsc::channel::<(Instant, Value)>(MAX_BUFFER_SIZE);
        let (out_sender, out_receiver) = mpsc::channel::<(Instant, Value)>(MAX_BUFFER_SIZE);
        tokio::spawn(process_stream(
            connection.clone(),
            uni_streams,
            out_sender,
            self_receiver,
            connection.remote_address()
        ));

        Ok(Connection {
            conn: connection,
            endpoint: Some(endpoint),
            self_sender,
            out_receiver,
        })
    }

    pub async fn new_for_server(conn: quinn::Connecting) -> ResultType<Self> {
        let quinn::NewConnection {
            connection,
            uni_streams,
            ..
        } = conn.await?;

        let (self_sender, self_receiver) = mpsc::channel::<(Instant, Value)>(MAX_BUFFER_SIZE);
        let (out_sender, out_receiver) = mpsc::channel::<(Instant, Value)>(MAX_BUFFER_SIZE);
        tokio::spawn(process_stream(
            connection.clone(),
            uni_streams,
            out_sender,
            self_receiver,
            connection.remote_address()
        ));

        Ok(Connection {
            conn: connection,
            endpoint: None,
            self_sender,
            out_receiver,
        })
    }

    #[inline]
    pub async fn next(&mut self) -> Option<Result<BytesMut, Error>> {
        match self.out_receiver.recv().await {
            None => None,
            Some((_, req_bytes)) => {
                let mut bytes = BytesMut::new();
                bytes.put_slice(&req_bytes);
                return Some(Ok(bytes));
            }
        }
    }

    #[inline]
    pub async fn next_timeout(&mut self, ms: u64) -> Option<Result<BytesMut, Error>> {
        if let Ok(res) =
            tokio::time::timeout(std::time::Duration::from_millis(ms), self.next()).await {
            res
        } else {
            None
        }
    }

    #[inline]
    pub async fn send(&mut self, msg: &impl Message) -> ResultType<()> {
        self.send_raw(msg.write_to_bytes()?).await
    }

    #[inline]
    pub async fn send_raw(&mut self, msg: Vec<u8>) -> ResultType<()> {
        self.self_sender.send((Instant::now(), msg))
            .await
            .map_err(|e| anyhow!("failed to shutdown stream: {}", e))
    }

    pub async fn send_bytes(&mut self, bytes: Bytes) -> ResultType<()> {
        self.send_raw(bytes.to_vec()).await?;
        Ok(())
    }

    #[inline]
    pub fn remote_address(&self) -> SocketAddr {
        self.conn.remote_address()
    }

    #[inline]
    pub fn local_address(&self) -> Option<SocketAddr> {
        if let Some(endpoint) = &self.endpoint {
            return Some(endpoint.local_addr().expect("get local address error"));
        }
        None
    }

    #[inline]
    pub async fn shutdown(&self) -> std::io::Result<()> {
        self.conn.close(0u32.into(), b"done");
        // Give the peer a fair chance to receive the close packet
        if let Some(endpoint) = &self.endpoint {
            endpoint.wait_idle().await;
        }
        Ok(())
    }

    pub fn set_raw(&mut self) {
    }

    pub fn set_key(&mut self, _key: Key) {
    }
}

async fn process_stream(
    conn: quinn::Connection,
    mut uni_streams: IncomingUniStreams,
    out_sender: Sender,
    mut self_receiver: Receiver,
    addr: SocketAddr
) {
    let a = async move {
        loop {
            match self_receiver.recv().await {
                Some((instant, msg)) => {
                    let latency = instant.elapsed().as_millis() as i64;
                    if DROP_MSG && latency as i64 > CHANNEL_LATENCY && msg.len() != RTT_LEN {
                        log::debug!("The duration of the message in the quic sending queue is: {:?}", latency);
                        continue;
                    }

                    if let Ok(mut sender_stream) = conn.open_uni().await {
                        log::debug!("send {} bytes to stream", msg.len());
                        match sender_stream.write_all(&msg).await {
                            Err(e) =>  {
                                log::error!("send msg error: {:?}", e);
                            }
                            _ => {}
                        }
                    }
                }
                None => break,
            }
        }
        log::info!("exit send loop");
        Err::<(), ()>(())
    };

    let b  = async move {
        loop {
            match uni_streams.next().await {
                Some(result) => {
                    match result {
                        Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
                            log::info!("connection terminated by peer {:?}.", &addr);
                            break;
                        }
                        Err(err) => {
                            log::info!("read msg for peer {:?} with error: {:?}", &addr, err);
                            break;
                        }
                        Ok(recv_stream) => {
                            if let Ok(bytes) = recv_stream.read_to_end(usize::max_value()).await {
                                log::debug!("read {} bytes from stream", bytes.len());
                                match out_sender.send((Instant::now(), bytes)).await {
                                    Err(_e) => {
                                        log::error!("connection closed");
                                        break;
                                    }
                                    _ => {}
                                }
                            }
                        }
                    }
                }
                None => break,
            }
        }
        log::info!("exit recv loop");
        Err::<(), ()>(())
    };

    let _ = tokio::join!(a, b);
    log::info!("close stream: {}", addr);
}

pub mod server {
    use super::*;

    pub fn new_endpoint(bind_addr: SocketAddr) -> ResultType<(Endpoint, Incoming)> {
        let server_config = config(CERT.server_cert_file, CERT.server_key_file).expect("config quic server error");
        let (endpoint, incoming) = Endpoint::server(server_config, bind_addr)?;
        Ok((endpoint, incoming))
    }

    fn config(certs: &str, key_file: &str) -> Result<ServerConfig, Box<dyn std::error::Error>> {
        let roots = load_certs(certs);
        let certs = roots.clone();
        let mut client_auth_roots = rustls::RootCertStore::empty();
        for root in roots {
            client_auth_roots.add(&root).unwrap();
        }
        let client_auth = rustls::server::AllowAnyAuthenticatedClient::new(client_auth_roots);

        let privkey = load_private_key(key_file);
        let suites = rustls::ALL_CIPHER_SUITES.to_vec();
        let versions = rustls::ALL_VERSIONS.to_vec();

        let server_crypto = rustls::ServerConfig::builder()
            .with_cipher_suites(&suites)
            .with_safe_default_kx_groups()
            .with_protocol_versions(&versions)
            .expect("inconsistent cipher-suites/versions specified")
            .with_client_cert_verifier(client_auth)
            .with_single_cert_with_ocsp_and_sct(certs, privkey, vec![], vec![])
            .expect("bad certificates/private key");

        let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(server_crypto));
        let transport = Arc::get_mut(&mut server_config.transport).unwrap();
        transport.max_concurrent_bidi_streams(0_u8.into());
        transport.max_concurrent_uni_streams(500_u32.into());
        log::info!("quic server config: {:?}", &server_config);
        Ok(server_config)
    }
}

pub mod client {
    use super::*;

    pub(crate) fn config(ca_file: &str, certs_file: &str, key_file: &str) -> ClientConfig {
        let cert_file = File::open(&ca_file).expect(&format!("Cannot open CA file: {:?}", ca_file));
        let mut reader = BufReader::new(cert_file);

        let mut root_store = rustls::RootCertStore::empty();
        root_store.add_parsable_certificates(&rustls_pemfile::certs(&mut reader).unwrap());

        let suites = rustls::DEFAULT_CIPHER_SUITES.to_vec();
        let versions = rustls::DEFAULT_VERSIONS.to_vec();

        let certs = load_certs(certs_file);
        let key = load_private_key(key_file);

        let crypto = rustls::ClientConfig::builder()
            .with_cipher_suites(&suites)
            .with_safe_default_kx_groups()
            .with_protocol_versions(&versions)
            .expect("inconsistent cipher-suite/versions selected")
            .with_root_certificates(root_store)
            .with_single_cert(certs, key)
            .expect("invalid client auth certs/key");

        let mut client_config = ClientConfig::new(Arc::new(crypto));
        let transport = Arc::get_mut(&mut client_config.transport).unwrap();
        transport.max_concurrent_bidi_streams(0_u8.into());
        transport.max_concurrent_uni_streams(500_u32.into());
        log::info!("quic client config: {:?}", &client_config);
        client_config
    }
}

pub fn load_certs(filename: &str) -> Vec<rustls::Certificate> {
    let certfile = File::open(filename).expect(&format!("cannot open certificate file: {:?}", filename));
    let mut reader = BufReader::new(certfile);
    rustls_pemfile::certs(&mut reader)
        .unwrap()
        .iter()
        .map(|v| rustls::Certificate(v.clone()))
        .collect()
}

pub fn load_private_key(filename: &str) -> rustls::PrivateKey {
    let keyfile = File::open(filename).expect("cannot open private key file");
    let mut reader = BufReader::new(keyfile);

    loop {
        match rustls_pemfile::read_one(&mut reader).expect("cannot parse private key .pem file") {
            Some(rustls_pemfile::Item::RSAKey(key)) => return rustls::PrivateKey(key),
            Some(rustls_pemfile::Item::PKCS8Key(key)) => return rustls::PrivateKey(key),
            None => break,
            _ => {}
        }
    }

    panic!(
        "no keys found in {:?} (encrypted keys not supported)",
        filename
    );
}

测试:

#[cfg(test)]
mod tests {
    use super::*;
    use std::{error::Error, net::SocketAddr};
    use futures_util::StreamExt;

    #[tokio::test]
    async fn quic() -> Result<(), Box<dyn Error>> {
        let addr = "127.0.0.1:5000".parse().unwrap();
        tokio::spawn(run_server(addr));
        run_client(addr).await;
        Ok(())
    }

    async fn run_server(addr: SocketAddr) {
        let (_endpoint, mut incoming) = server::new_endpoint(addr).unwrap();
        while let Some(conn) = incoming.next().await {
            tokio::spawn(handle_connection(conn));
        }
    }

    async fn handle_connection(conn: quinn::Connecting) {
        let mut conn = Connection::new_for_server(conn).await.unwrap();
        println!("[server] client address: {:?}", conn.remote_address());

        if let Some(recv_bytes) = conn.next().await {
            println!("[server] [2] recive: {:?}", recv_bytes);

            println!("[server] [3] send: hello client 1");
            conn.send_raw(b"hello client 1".to_vec())
                .await
                .unwrap_or_else(move |e| println!("failed: {reason}", reason = e.to_string()));
        }

        println!("[server] [5] send: hello client 2");
        conn.send_raw(b"hello client 2".to_vec()).await.unwrap();
        if let Some(resp_bytes) = conn.next().await {
            println!("[server] [8] receive: {:?}", resp_bytes);
        }
    }

    async fn run_client(server_addr: SocketAddr) {
        let local_addr = "127.0.0.1:8888".parse().unwrap();
        let mut conn = Connection::new_for_client(server_addr, local_addr, 1000).await.unwrap();

        println!("[client] [1] send: hello server 1");
        let mut buf = BytesMut::with_capacity(64);
        buf.put(&b"hello server 1"[..]);
        conn.send_raw(b"hello server 1".to_vec()).await.unwrap();
        let resp_bytes = conn.next().await.unwrap();
        println!("[client] [4] receive: {:?}", resp_bytes);

        if let Some(recv_bytes) = conn.next().await {
            println!("[client] [6] recive: {:?}", recv_bytes);

            println!("[client] [7] send: hello server 2");
            conn.send_raw(b"hello server 2".to_vec())
                .await
                .unwrap_or_else(move |e| println!("failed: {reason}", reason = e.to_string()));
        }

        conn.shutdown().await.unwrap();
    }
}

使用心得:

  • 当传输视频时,如果调用 send_stream.finish().await,会一直阻塞,直到发送方收到数据,导致两帧之间至少会等待一个 RTT。所以在调用 send_stream.write_all() 后,不应调用 send_stream..finish().await。
  • 如果想使用不可靠传输,可以使用 connection.send_datagram() 传输数据,使用 connection.datagrams.next() 接收数据。但是每次发送时,数据量都不能太大,一般不超过 1200 字节。
  • 无法关闭 TLS,如果仅仅用于测试,可以临时生成自签名证书。
  • 打开流,比如 connection.open_uni() 或 connection.open_bi() 并不耗时,仅仅是在 HashMap 中插入一个元素而已。在流上写数据也不耗时,仅仅是写在本地内存中,但是会受到 Stream 和 Connection 两个级别的流量控制。

3 配置

TransportConfig 中包含多项配置,例如:

  • max_concurrent_bidi_streams:最多可并发的双向流数量。如果不允许对方打开双向流,应该设为 0。消耗的最大内存量为max_concurrent_bidi_streams * stream_receive_window
  • max_concurrent_uni_streams:最多可并发的单向流数量。
  • max_idle_timeout:在连接超时之前可接受的最长空闲时间。None 表示无限长的时间。
  • stream_receive_window:对方在没有收到确认的情况下,可传输的最多 bytes 数。至少应为连接延迟乘以最大吞吐量。应该小于 receive_window,以免该流独占接收缓冲区。
  • receive_window:对方在连接阻塞之前,在所有流上可传输的最多 bytes 数。至少应为连接延迟乘以最大吞吐量。
  • send_window:在没有收到对方确认的情况下,可传输的最大 bytes 数。提供了与对方通信的内存上限。如果希望健壮地处理有大量连接端点的情况,应将该值设得足够低,避免每个连接都使用整个窗口时耗尽内存。
  • keep_alive_interval:发送 keep-alive 的时间间隔。默认为 None,表示关掉该功能。

计算示例:

const EXPECTED_RTT: u32 = 100; // ms
const MAX_STREAM_BANDWIDTH: u32 = 12500 * 1000; // bytes/s
const STREAM_RWND: u32 = MAX_STREAM_BANDWIDTH / 1000 * EXPECTED_RTT;

stream_receive_window = STREAM_RWND
send_window = 8 * STREAM_RWND

TransportConfig 中的默认值是吞吐量为 100Mbps,延迟 100ms 情况下的配置,适用于大多数网络,基本上不需要修改。

4 长连接

TransportConfig 中的配置项 max_idle_timeout 和 keep_alive_interval 与长连接有关。

本文配置如下:

transport.max_idle_timeout(Some(Duration::from_secs(3).try_into().unwrap()));
transport.keep_alive_interval(Some(Duration::from_secs(1).try_into().unwrap()));

客户端睡眠 5s 后再发送消息,观察不同场景下的效果。

  • 客户端永不退出

    • 客户端和服务都进行了配置:服务端没有出现 timeout。
    • 只有服务端进行了配置:服务端没有出现 timeout。
    • 只有客户端进行了配置:服务端没有出现 timeout。
  • 客户端退出,没有 shutdown

    • 客户端和服务都进行了配置:服务端出现 timeout。
  • 客户端退出,且 shutdown

    • 客户端和服务都进行了配置:服务端没有出现 timeout。
    • 只有服务端进行了配置:服务端没有出现 timeout。
    • 只有客户端进行了配置:服务端没有出现 timeout。
    • 客户端和服务端都没有配置 keep alive interval:服务端出现 timeout

5 源码阅读

5.1 发送数据

  1. SendStream.write_all()

    • 功能:将消息写到 SendBuffer 中。
    • 如果消息较大,会分多次写完。
    • 每次写的时候都不超过如下值:

      • connection 级别的流量控制:max_data - data_sent
      • stream 级别的控制,发送窗口:send_window - unacked_data
      • budget:max_data - pending.offset()
      • 以上值任意一个为 0,则阻塞。写成功后,这些计数器增加:data_sent,unacked_data,pending.offset()。
  2. SendStream.finish()

    • 功能:当对方确认数据后,该函数结束。
    • 实现:监听 channel,收到 None 或 error 后,结束。

5.2 接收数据

  1. IncomingUniStreams.next()

    • 功能:接收新的流。
    • 实现:一直轮询 connection::streams::Streams.accept() 是否接收到了新的流,即流 ID 是否已增加。流 ID 增加的情况:

      • Endpoint::new()

        • EndpointDriver.poll(),处理收发等事件。

          • EndpointInner.drive_recv():在 udp socket 上轮询,接收新连接,或在旧连接上接收数据。

            • Endpoint.handle():处理接收到的 udp 数据。解密 header。

              • Endpoint.handle_first_packet():解密数据,添加连接,处理第一个数据包。

                • Connection.handle_first_packet():处理解密后的数据包。重置 keep_alive,idle_timeout 计时器。将该包的编号加入到 PendAcks 队列中,如果队列长度达到 64,则丢弃最老的编号。更新收到的包的编号。

                  • Connection.process_decrypted_packet()

                    • Connection.process_payload():处理从 payload 中获取的每个 frame,并处理。

                      • StreamsState.received():处理新来的流。会计算所有流上收到的数据,数据排序,写到 Recv.assembler

                        • Recv.ingest():检查数据长度,不能超过 2^64

                          • Recv.credit_consumed_by():流量控制,包括 stream 和 connection 两个级别上的流量控制
                          • Recv.assembler.insert():去除重复数据,插入优先队列 Recv.assembler.data中,当用户调用 RecvStream.read_to_end() 可获取数据。
                        • StreamsState.on_stream_frame() 将流 ID 增加。当用户调用 IncomingUniStreams.next()可发现已接收到了新的流。
  2. RecvStream.read_to_end()

    • 功能:将从 Recv.assembler.data 中读取到的数据返回,Recv.assembler.data 是一个优先队列,可能会读取多次。为了提高性能,无序读取。
    • 可能会要求重传。

6 效果

  • 在移动的弱网环境下,相对于 TCP 传输更稳定,即使 IP 切换连接也不会断开。
  • 相对于 TCP,端到端的延时更稳定。

参考

https://greenbytes.de/tech/webdav/draft-ietf-quic-transport-16.html
https://github.com/quinn-rs/quinn