这篇文章上次修改于 905 天前,可能其部分内容已经发生变化,如有疑问可询问作者。
1 QUIC
QUIC 是基于 UDP 的多路复用、安全传输协议。可以简单理解为在用户空间将 TCP 里的机制实现了一遍,比如拥塞控制、流量控制等。好处是升级比较方便,TCP 协议栈是内核中实现的,只能随内核升级,而 QUIC 可灵活升级。
QUIC 还有如下优点:
- 采用 Connection ID 标识一条连接,而非四元组(src_ip, src_port, dst_ip, dst_port),如果发生网络切换,连接仍然不会断开。
- 无队头阻塞问题。真正实现多个流传输数据,各个流之间互不影响,即连接内的一个流阻塞不会导致所有流阻塞。
改进的拥塞控制。
- 目前有两种拥塞控制算法可供选择。
- 严格单调递增的 Packet 序列,较容易区分是重传还是首次传输。
- 携带收到数据包和发送 ACK 之间的延迟信息,可以更准确地计算 RTT 时间。
- 尽可能实现快重传而不是超时重传。在达到超时之前,发送两个 TLPs 触发快重传。
- 客户端可以保存握手信息,如果没有过期,后续再重连只需要 0RTT。
- 安全,包含了 TLS。
2 使用
本文采用 rust 中的 quic 库 --- quinn。
概念:
- Client:发起 QUIC 连接的 Endpoint。
- Server:接收 QUIC 连接的 Endpoint。
- Endpoint:终端,可以是客户端,也可以是服务端。
- Stream:QUIC 连接内有序的单向或双向通道。
- Connection:是两个 QUIC Endpoint 之间的加密连接,包含多路复用 Stream。
对 quinn 库的封装:
#![cfg(feature = "rustls")]
use bytes::{BufMut, Bytes, BytesMut};
use futures_util::StreamExt;
use protobuf::Message;
use quinn::{ClientConfig, Endpoint, Incoming, IncomingUniStreams, NewConnection, ServerConfig};
use rustls;
use std::{
fs::File,
io::{Error, BufReader},
net::SocketAddr,
sync::Arc,
str
};
use sodiumoxide::crypto::secretbox::Key;
use tokio::{self, sync::mpsc};
use crate::{anyhow::anyhow, ResultType, time::Instant, internal_config::RTT_LEN};
pub use quinn;
const SERVER_NAME: &str = "xremote.autox.tech";
const CHANNEL_LATENCY: i64 = 1_000;
const DROP_MSG: bool = true;
pub(crate) struct Cert<'a> {
ca_file: &'a str,
client_cert_file: &'a str,
client_key_file: &'a str,
server_cert_file: &'a str,
server_key_file: &'a str,
}
#[cfg(not(debug_assertions))]
lazy_static::lazy_static! {
static ref CERT: Cert<'static> = Cert {
ca_file: "/etc/ssl/ca.cert",
client_cert_file: "/etc/ssl/client.cert",
client_key_file: "/etc/ssl/client.key",
server_cert_file: "/etc/ssl/server.fullchain",
server_key_file: "/etc/ssl/server.rsa",
};
}
const MAX_BUFFER_SIZE: usize = 128;
type Value = Vec<u8>;
type Sender = mpsc::Sender<(Instant, Value)>;
type Receiver = mpsc::Receiver<(Instant, Value)>;
pub struct Connection {
pub conn: quinn::Connection,
pub endpoint: Option<Endpoint>,
self_sender: Sender,
out_receiver: Receiver,
}
impl Connection {
pub async fn new_for_client(
server_addr: SocketAddr,
local_addr: SocketAddr,
ms_timeout: u64
) -> ResultType<Self> {
let client_cfg = client::config(CERT.ca_file, CERT.client_cert_file, CERT.client_key_file);
let mut endpoint = Endpoint::client(local_addr).expect("create client endpoint");
endpoint.set_default_client_config(client_cfg);
let connecting = super::timeout(
ms_timeout,
endpoint.connect(server_addr, SERVER_NAME).expect("connect to server error")
).await??;
let NewConnection {
connection,
uni_streams,
..
} = connecting;
let (self_sender, self_receiver) = mpsc::channel::<(Instant, Value)>(MAX_BUFFER_SIZE);
let (out_sender, out_receiver) = mpsc::channel::<(Instant, Value)>(MAX_BUFFER_SIZE);
tokio::spawn(process_stream(
connection.clone(),
uni_streams,
out_sender,
self_receiver,
connection.remote_address()
));
Ok(Connection {
conn: connection,
endpoint: Some(endpoint),
self_sender,
out_receiver,
})
}
pub async fn new_for_server(conn: quinn::Connecting) -> ResultType<Self> {
let quinn::NewConnection {
connection,
uni_streams,
..
} = conn.await?;
let (self_sender, self_receiver) = mpsc::channel::<(Instant, Value)>(MAX_BUFFER_SIZE);
let (out_sender, out_receiver) = mpsc::channel::<(Instant, Value)>(MAX_BUFFER_SIZE);
tokio::spawn(process_stream(
connection.clone(),
uni_streams,
out_sender,
self_receiver,
connection.remote_address()
));
Ok(Connection {
conn: connection,
endpoint: None,
self_sender,
out_receiver,
})
}
#[inline]
pub async fn next(&mut self) -> Option<Result<BytesMut, Error>> {
match self.out_receiver.recv().await {
None => None,
Some((_, req_bytes)) => {
let mut bytes = BytesMut::new();
bytes.put_slice(&req_bytes);
return Some(Ok(bytes));
}
}
}
#[inline]
pub async fn next_timeout(&mut self, ms: u64) -> Option<Result<BytesMut, Error>> {
if let Ok(res) =
tokio::time::timeout(std::time::Duration::from_millis(ms), self.next()).await {
res
} else {
None
}
}
#[inline]
pub async fn send(&mut self, msg: &impl Message) -> ResultType<()> {
self.send_raw(msg.write_to_bytes()?).await
}
#[inline]
pub async fn send_raw(&mut self, msg: Vec<u8>) -> ResultType<()> {
self.self_sender.send((Instant::now(), msg))
.await
.map_err(|e| anyhow!("failed to shutdown stream: {}", e))
}
pub async fn send_bytes(&mut self, bytes: Bytes) -> ResultType<()> {
self.send_raw(bytes.to_vec()).await?;
Ok(())
}
#[inline]
pub fn remote_address(&self) -> SocketAddr {
self.conn.remote_address()
}
#[inline]
pub fn local_address(&self) -> Option<SocketAddr> {
if let Some(endpoint) = &self.endpoint {
return Some(endpoint.local_addr().expect("get local address error"));
}
None
}
#[inline]
pub async fn shutdown(&self) -> std::io::Result<()> {
self.conn.close(0u32.into(), b"done");
// Give the peer a fair chance to receive the close packet
if let Some(endpoint) = &self.endpoint {
endpoint.wait_idle().await;
}
Ok(())
}
pub fn set_raw(&mut self) {
}
pub fn set_key(&mut self, _key: Key) {
}
}
async fn process_stream(
conn: quinn::Connection,
mut uni_streams: IncomingUniStreams,
out_sender: Sender,
mut self_receiver: Receiver,
addr: SocketAddr
) {
let a = async move {
loop {
match self_receiver.recv().await {
Some((instant, msg)) => {
let latency = instant.elapsed().as_millis() as i64;
if DROP_MSG && latency as i64 > CHANNEL_LATENCY && msg.len() != RTT_LEN {
log::debug!("The duration of the message in the quic sending queue is: {:?}", latency);
continue;
}
if let Ok(mut sender_stream) = conn.open_uni().await {
log::debug!("send {} bytes to stream", msg.len());
match sender_stream.write_all(&msg).await {
Err(e) => {
log::error!("send msg error: {:?}", e);
}
_ => {}
}
}
}
None => break,
}
}
log::info!("exit send loop");
Err::<(), ()>(())
};
let b = async move {
loop {
match uni_streams.next().await {
Some(result) => {
match result {
Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
log::info!("connection terminated by peer {:?}.", &addr);
break;
}
Err(err) => {
log::info!("read msg for peer {:?} with error: {:?}", &addr, err);
break;
}
Ok(recv_stream) => {
if let Ok(bytes) = recv_stream.read_to_end(usize::max_value()).await {
log::debug!("read {} bytes from stream", bytes.len());
match out_sender.send((Instant::now(), bytes)).await {
Err(_e) => {
log::error!("connection closed");
break;
}
_ => {}
}
}
}
}
}
None => break,
}
}
log::info!("exit recv loop");
Err::<(), ()>(())
};
let _ = tokio::join!(a, b);
log::info!("close stream: {}", addr);
}
pub mod server {
use super::*;
pub fn new_endpoint(bind_addr: SocketAddr) -> ResultType<(Endpoint, Incoming)> {
let server_config = config(CERT.server_cert_file, CERT.server_key_file).expect("config quic server error");
let (endpoint, incoming) = Endpoint::server(server_config, bind_addr)?;
Ok((endpoint, incoming))
}
fn config(certs: &str, key_file: &str) -> Result<ServerConfig, Box<dyn std::error::Error>> {
let roots = load_certs(certs);
let certs = roots.clone();
let mut client_auth_roots = rustls::RootCertStore::empty();
for root in roots {
client_auth_roots.add(&root).unwrap();
}
let client_auth = rustls::server::AllowAnyAuthenticatedClient::new(client_auth_roots);
let privkey = load_private_key(key_file);
let suites = rustls::ALL_CIPHER_SUITES.to_vec();
let versions = rustls::ALL_VERSIONS.to_vec();
let server_crypto = rustls::ServerConfig::builder()
.with_cipher_suites(&suites)
.with_safe_default_kx_groups()
.with_protocol_versions(&versions)
.expect("inconsistent cipher-suites/versions specified")
.with_client_cert_verifier(client_auth)
.with_single_cert_with_ocsp_and_sct(certs, privkey, vec![], vec![])
.expect("bad certificates/private key");
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(server_crypto));
let transport = Arc::get_mut(&mut server_config.transport).unwrap();
transport.max_concurrent_bidi_streams(0_u8.into());
transport.max_concurrent_uni_streams(500_u32.into());
log::info!("quic server config: {:?}", &server_config);
Ok(server_config)
}
}
pub mod client {
use super::*;
pub(crate) fn config(ca_file: &str, certs_file: &str, key_file: &str) -> ClientConfig {
let cert_file = File::open(&ca_file).expect(&format!("Cannot open CA file: {:?}", ca_file));
let mut reader = BufReader::new(cert_file);
let mut root_store = rustls::RootCertStore::empty();
root_store.add_parsable_certificates(&rustls_pemfile::certs(&mut reader).unwrap());
let suites = rustls::DEFAULT_CIPHER_SUITES.to_vec();
let versions = rustls::DEFAULT_VERSIONS.to_vec();
let certs = load_certs(certs_file);
let key = load_private_key(key_file);
let crypto = rustls::ClientConfig::builder()
.with_cipher_suites(&suites)
.with_safe_default_kx_groups()
.with_protocol_versions(&versions)
.expect("inconsistent cipher-suite/versions selected")
.with_root_certificates(root_store)
.with_single_cert(certs, key)
.expect("invalid client auth certs/key");
let mut client_config = ClientConfig::new(Arc::new(crypto));
let transport = Arc::get_mut(&mut client_config.transport).unwrap();
transport.max_concurrent_bidi_streams(0_u8.into());
transport.max_concurrent_uni_streams(500_u32.into());
log::info!("quic client config: {:?}", &client_config);
client_config
}
}
pub fn load_certs(filename: &str) -> Vec<rustls::Certificate> {
let certfile = File::open(filename).expect(&format!("cannot open certificate file: {:?}", filename));
let mut reader = BufReader::new(certfile);
rustls_pemfile::certs(&mut reader)
.unwrap()
.iter()
.map(|v| rustls::Certificate(v.clone()))
.collect()
}
pub fn load_private_key(filename: &str) -> rustls::PrivateKey {
let keyfile = File::open(filename).expect("cannot open private key file");
let mut reader = BufReader::new(keyfile);
loop {
match rustls_pemfile::read_one(&mut reader).expect("cannot parse private key .pem file") {
Some(rustls_pemfile::Item::RSAKey(key)) => return rustls::PrivateKey(key),
Some(rustls_pemfile::Item::PKCS8Key(key)) => return rustls::PrivateKey(key),
None => break,
_ => {}
}
}
panic!(
"no keys found in {:?} (encrypted keys not supported)",
filename
);
}
测试:
#[cfg(test)]
mod tests {
use super::*;
use std::{error::Error, net::SocketAddr};
use futures_util::StreamExt;
#[tokio::test]
async fn quic() -> Result<(), Box<dyn Error>> {
let addr = "127.0.0.1:5000".parse().unwrap();
tokio::spawn(run_server(addr));
run_client(addr).await;
Ok(())
}
async fn run_server(addr: SocketAddr) {
let (_endpoint, mut incoming) = server::new_endpoint(addr).unwrap();
while let Some(conn) = incoming.next().await {
tokio::spawn(handle_connection(conn));
}
}
async fn handle_connection(conn: quinn::Connecting) {
let mut conn = Connection::new_for_server(conn).await.unwrap();
println!("[server] client address: {:?}", conn.remote_address());
if let Some(recv_bytes) = conn.next().await {
println!("[server] [2] recive: {:?}", recv_bytes);
println!("[server] [3] send: hello client 1");
conn.send_raw(b"hello client 1".to_vec())
.await
.unwrap_or_else(move |e| println!("failed: {reason}", reason = e.to_string()));
}
println!("[server] [5] send: hello client 2");
conn.send_raw(b"hello client 2".to_vec()).await.unwrap();
if let Some(resp_bytes) = conn.next().await {
println!("[server] [8] receive: {:?}", resp_bytes);
}
}
async fn run_client(server_addr: SocketAddr) {
let local_addr = "127.0.0.1:8888".parse().unwrap();
let mut conn = Connection::new_for_client(server_addr, local_addr, 1000).await.unwrap();
println!("[client] [1] send: hello server 1");
let mut buf = BytesMut::with_capacity(64);
buf.put(&b"hello server 1"[..]);
conn.send_raw(b"hello server 1".to_vec()).await.unwrap();
let resp_bytes = conn.next().await.unwrap();
println!("[client] [4] receive: {:?}", resp_bytes);
if let Some(recv_bytes) = conn.next().await {
println!("[client] [6] recive: {:?}", recv_bytes);
println!("[client] [7] send: hello server 2");
conn.send_raw(b"hello server 2".to_vec())
.await
.unwrap_or_else(move |e| println!("failed: {reason}", reason = e.to_string()));
}
conn.shutdown().await.unwrap();
}
}
使用心得:
- 当传输视频时,如果调用 send_stream.finish().await,会一直阻塞,直到发送方收到数据,导致两帧之间至少会等待一个 RTT。所以在调用 send_stream.write_all() 后,不应调用 send_stream..finish().await。
- 如果想使用不可靠传输,可以使用 connection.send_datagram() 传输数据,使用 connection.datagrams.next() 接收数据。但是每次发送时,数据量都不能太大,一般不超过 1200 字节。
- 无法关闭 TLS,如果仅仅用于测试,可以临时生成自签名证书。
- 打开流,比如 connection.open_uni() 或 connection.open_bi() 并不耗时,仅仅是在 HashMap 中插入一个元素而已。在流上写数据也不耗时,仅仅是写在本地内存中,但是会受到 Stream 和 Connection 两个级别的流量控制。
3 配置
TransportConfig 中包含多项配置,例如:
- max_concurrent_bidi_streams:最多可并发的双向流数量。如果不允许对方打开双向流,应该设为 0。消耗的最大内存量为
max_concurrent_bidi_streams * stream_receive_window
- max_concurrent_uni_streams:最多可并发的单向流数量。
- max_idle_timeout:在连接超时之前可接受的最长空闲时间。None 表示无限长的时间。
- stream_receive_window:对方在没有收到确认的情况下,可传输的最多 bytes 数。至少应为连接延迟乘以最大吞吐量。应该小于 receive_window,以免该流独占接收缓冲区。
- receive_window:对方在连接阻塞之前,在所有流上可传输的最多 bytes 数。至少应为连接延迟乘以最大吞吐量。
- send_window:在没有收到对方确认的情况下,可传输的最大 bytes 数。提供了与对方通信的内存上限。如果希望健壮地处理有大量连接端点的情况,应将该值设得足够低,避免每个连接都使用整个窗口时耗尽内存。
- keep_alive_interval:发送 keep-alive 的时间间隔。默认为 None,表示关掉该功能。
计算示例:
const EXPECTED_RTT: u32 = 100; // ms
const MAX_STREAM_BANDWIDTH: u32 = 12500 * 1000; // bytes/s
const STREAM_RWND: u32 = MAX_STREAM_BANDWIDTH / 1000 * EXPECTED_RTT;
stream_receive_window = STREAM_RWND
send_window = 8 * STREAM_RWND
TransportConfig 中的默认值是吞吐量为 100Mbps,延迟 100ms 情况下的配置,适用于大多数网络,基本上不需要修改。
4 长连接
TransportConfig 中的配置项 max_idle_timeout 和 keep_alive_interval 与长连接有关。
本文配置如下:
transport.max_idle_timeout(Some(Duration::from_secs(3).try_into().unwrap()));
transport.keep_alive_interval(Some(Duration::from_secs(1).try_into().unwrap()));
客户端睡眠 5s 后再发送消息,观察不同场景下的效果。
客户端永不退出
- 客户端和服务都进行了配置:服务端没有出现 timeout。
- 只有服务端进行了配置:服务端没有出现 timeout。
- 只有客户端进行了配置:服务端没有出现 timeout。
客户端退出,没有 shutdown
- 客户端和服务都进行了配置:服务端出现 timeout。
客户端退出,且 shutdown
- 客户端和服务都进行了配置:服务端没有出现 timeout。
- 只有服务端进行了配置:服务端没有出现 timeout。
- 只有客户端进行了配置:服务端没有出现 timeout。
- 客户端和服务端都没有配置 keep alive interval:服务端出现 timeout
5 源码阅读
5.1 发送数据
SendStream.write_all()
- 功能:将消息写到 SendBuffer 中。
- 如果消息较大,会分多次写完。
每次写的时候都不超过如下值:
- connection 级别的流量控制:
max_data - data_sent
- stream 级别的控制,发送窗口:
send_window - unacked_data
- budget:
max_data - pending.offset()
- 以上值任意一个为 0,则阻塞。写成功后,这些计数器增加:data_sent,unacked_data,pending.offset()。
- connection 级别的流量控制:
SendStream.finish()
- 功能:当对方确认数据后,该函数结束。
- 实现:监听 channel,收到 None 或 error 后,结束。
5.2 接收数据
IncomingUniStreams.next()
- 功能:接收新的流。
实现:一直轮询 connection::streams::Streams.accept() 是否接收到了新的流,即流 ID 是否已增加。流 ID 增加的情况:
Endpoint::new()
EndpointDriver.poll(),处理收发等事件。
EndpointInner.drive_recv():在 udp socket 上轮询,接收新连接,或在旧连接上接收数据。
Endpoint.handle():处理接收到的 udp 数据。解密 header。
Endpoint.handle_first_packet():解密数据,添加连接,处理第一个数据包。
Connection.handle_first_packet():处理解密后的数据包。重置 keep_alive,idle_timeout 计时器。将该包的编号加入到 PendAcks 队列中,如果队列长度达到 64,则丢弃最老的编号。更新收到的包的编号。
Connection.process_decrypted_packet()
Connection.process_payload():处理从 payload 中获取的每个 frame,并处理。
StreamsState.received():处理新来的流。会计算所有流上收到的数据,数据排序,写到 Recv.assembler
Recv.ingest():检查数据长度,不能超过 2^64
- Recv.credit_consumed_by():流量控制,包括 stream 和 connection 两个级别上的流量控制
- Recv.assembler.insert():去除重复数据,插入优先队列 Recv.assembler.data中,当用户调用
RecvStream.read_to_end()
可获取数据。
- StreamsState.on_stream_frame() 将流 ID 增加。当用户调用
IncomingUniStreams.next()
可发现已接收到了新的流。
RecvStream.read_to_end()
- 功能:将从
Recv.assembler.data
中读取到的数据返回,Recv.assembler.data
是一个优先队列,可能会读取多次。为了提高性能,无序读取。 - 可能会要求重传。
- 功能:将从
6 效果
- 在移动的弱网环境下,相对于 TCP 传输更稳定,即使 IP 切换连接也不会断开。
- 相对于 TCP,端到端的延时更稳定。
参考
https://greenbytes.de/tech/webdav/draft-ietf-quic-transport-16.html
https://github.com/quinn-rs/quinn
没有评论