tokio/rust如何强制断开tcp监听

前阵子照着wendal大佬搞的LuatOS 网络测试工具,自己用rust实现了一套后端接口,做了个山寨版本的工具。第一次用tokio,遇到不少问题所以记录一下。

需求

工具需要在用户ws命令请求后,开启一个tcp监听服务器,并且在用户ws断开后关闭这个tcp监听。tcp监听代码类似下面这样

let listener = TcpListener::bind(
    (IpAddr::from_str("0.0.0.0").unwrap(),port)
).await.unwrap();

loop {//获取监听消息
    let (socket, _) = match listener.accept().await {
        Ok(l) => l,
        Err(_) => continue,
    };
    let (mut socket_read,mut socket_write) = socket.into_split();
    //........
    //socket_write这里还有几行代码是丢出去给其他部分用的
    //.....

    //新建个线程给这个客户端收发使用
    tokio::spawn(async move {
        info!("new client connected");
        let mut buf = vec![0; 2048];
        loop {
            match socket_read.read(&mut buf).await {
                Ok(0) => {
                    info!("receive 0 length");
                    break
                },
                Ok(n) => {
                    info!("recv tcp msg");
                    // if socket.write_all(&buf[..n]).await.is_err() {
                    //     break
                    // }
                }
                Err(e) => {
                    error!("read error! {:?}",e);
                    break
                }
            }
        }
        info!("client disconnected by remote");
    });
}

分析

根据上面的代码可以很清晰地看出来,因为每个客户端都单独新建了一个任务。并且里外都使用了.accept().read()这种等待的接口,导致只有事件上来之后才有机会运行其他代码,没法随时在这些代码里面退出

查找资料

最早看rust教程时看的是《Rust 程序设计语言 简体中文版》,在教程最后一章节《优雅停机与清理》里实现了tcp服务器的停机。但是再查看了它使用的方法之后,发现并不适合。。(因为用户又不会给你发停机命令)

stackoverflow上搜索了不少类似问题,发现强制退出基本就只有关闭tcp监听线程这种操作,但是因为rust的安全特性,没法直接强制关闭某个线程,暂时不考虑这个方向

在翻阅了tokio的接口文档之后,发现select!可以实现这个需求

解决方案

futures::select 宏同时跑多个 future,允许用户在任意 future 完成时响应

就是说,使用select!,可以同时跑多个任务,并且在某一个任务退出后,所有任务都会被强制停止。

tokio::spawn(async move {
    select! {
        _ = async {
            //一堆代码
        } => {}
        _ = async {
            //另一堆代码
        } => {}
    }
})

那这就舒服了,直接把上面的代码稍微改改就行了:

let (kill_tx, mut kill_rx) = tokio::sync::watch::channel(false);
//....省略一堆代码
loop {
    //....省略一堆代码
    //新建个线程给这个客户端收发使用
    tokio::spawn(async move {
        select! {
            _ = async {
                //原本在tokio::spawn写的那一堆东西
                //.......
                //......
            } => {}
            _ = async {
                loop {
                    if kill_rx.changed().await.is_ok() {
                        info!("kill this tcp server!");
                        return//该任务退出,别的也会停
                    }
                }
            } => {}
        }
    });
}

只要在别的地方,调用一下kill_tx.send(true).unwrap_or(()),就可以强制关闭这个tcp服务器了

发表评论

您的电子邮箱地址不会被公开。