这篇文章上次修改于 373 天前,可能其部分内容已经发生变化,如有疑问可询问作者。

1 背景

signaling 服务器维护 client 的 online 状态,且两个客户端如果想建立连接必须先连接 signaling 且拿到 signaling 提供的 UUID。

signaling 部署在 K8s 上,监听 UDP 端口,service 类型为 ClusterIP,由 Nginx 代理 ,提供对集群外的服务。

2 现象

在 signaling 升级期间,如果 client1 连接 client2,会失败,即使 signaling 升级结束,client1 也无法再成功连接 client2。

3 解决

存在死锁的代码:

let &signaling_node = key_union.get(3).unwrap();
let node_map = signaling_node_map.read().await;
if !node_map.contains_key(signaling_node) {
    log::info!("Do not found target node in local cache.");
    match self
        .registry_client
        .read()
        .await
        .get_nodes("singaling", HashMap::new())
        .await
    {
        Some(nodes) => {
            let mut map_ref = signaling_node_map.write().await;
            // Fetch all node description from discovery service
            for node in nodes.nodes_item {
                map_ref.insert(node.nid.clone(), node);
            }
        }
        None => log::warn!("Do not fetch available nodes"),
    }
}
match node_map.get(signaling_node) {
    Some(node) => {
        return Some(node.clone());
    }
    None => {
        log::error!("signaling node is not found: {:?}", signaling_node);
        return None;
    }
}

先获取 signaling_node_map 的读锁,当 signaling_node_map 不存在 signaling 节点时,会查询并获取 signaling_node_map 的写锁进行更新。读锁没有释放的情况下,再获取写锁,导致死锁。

改为:

let &signaling_node = key_union.get(3).unwrap();
{
    let node_map = signaling_node_map.read().await;
    if node_map.contains_key(signaling_node) {
        log::info!("signaling node: {:?}", signaling_node);
        return Some(node_map.get(signaling_node).unwrap().clone());
    }
}

log::info!("Do not found target node in local cache.");
let mut node_map = signaling_node_map.write().await;
match self
    .registry_client
    .read()
    .await
    .get_nodes("singaling", HashMap::new())
    .await
{
    Some(nodes) => {
        let mut target_node: Option<NodeDescription> = None;
        // Fetch all node description from discovery service
        for node in nodes.nodes_item {
            log::info!("fetch node: {:?}", node.nid);
            if node.nid == signaling_node {
                target_node = Some(node.clone());
            }
            node_map.insert(node.nid.clone(), node);
        }
        return target_node;
    }
    None => {
        log::warn!("Do not fetch available nodes");
        return None;
    }
}

4 排查过程

刚开始以为是网络问题,分别在 client 和 signaling 上采用 tcpdump 查看网络包的收发情况:

sudo tcpdump -i any udp port 51116

发现连接成功的情况下,网络包的数量比较多,连接失败的情况下,网络包的数量比较少,说明至少网络是通的,只是不明白为什么网络包的数量会不一样。

为了进一步排除网络因素,将服务器版本回退到上一个版本,发现不存在 client1 连接 client2 失败的情况,证明网络没有问题,是服务端代码本身有问题,猜测是卡在了某个地方。

阅读服务端逻辑,感觉连接失败的情况下,之所以有网络包,是因为建立连接成功,即如下代码中的 listener.next() 是成功的,只是 signaling_hander.handle_signaling() 卡住了。通过打印日志也进一步确认了这点。

loop {
    tokio::select! {
        Some(new_conn) = self.listener.next() => {
            let redis_client = self.redis_client.clone();
            let registry_client = self.registry_client.clone();
            let mut signaling_hander = SignalingHandler::new(
                registry_client,
                redis_client,
                self.settings.signaling.clone(),
                self.node_id.clone()
            );
            let client_addr = new_conn.remote_address();
            log::info!("Found new connection, client addr: {:?}", &client_addr);
            tokio::spawn(async move {
                match signaling_hander.handle_signaling(new_conn, NODE_CLUSTER.clone(), PUPPET_CONN_MAP.clone()).await {
                    Err(e) => log::error!(
                        "client {:?} connect to signaling server error: {:?}", client_addr, e),
                    _ => {}
                }
            });
        }
        ...

继续阅读 signaling_hander.handle_signaling() 逻辑,并在可疑的地方都打上日志,发现有一个地方在获取读锁后,后面的日志再也能打印出来,发现后面又获取了写锁。至此,排查结束。修复后该问题没有再出现。