初学rust,无法在不同的线程中并行读写websocket
Tag rust, 线程, websocket, 读写分离, on by view 316

最近发现一个问题,rust的线程安全机制导致无法实现socket读写分离到两个不同的线程。

先说一下程序的背景,程序是将本地终端pty(cli)拉起,并且将pty的输入输出通过channel对接,并将cli输出的数据经过channel写入到服务端socket,将从服务端socket收取到的数据经另一个channel写入到cli的输入。从而实现远程连接pty。

按照rust的写法,读线程中,在读socket之前需要先锁socket,然后读取,再释放锁;同样,在写线程中,也需要先锁socket,然后写入,再释放锁。这样一来代码应该如下:

连接与初始化代码如下

let (ws_stream, response) =
    connect(Url::parse("wss://ws.postman-echo.com/raw").unwrap()).expect("msg");

println!("Connected to the server");
println!("Response HTTP code: {}", response.status());
println!("Response contains the following headers:");
for (ref header, _value) in response.headers() {
    println!("* {}", header);
}

let socket = Arc::new(Mutex::new(ws_stream));

// init cli
self.pty_start();

let mut me = self.clone();
let skt = socket.clone();
thread::spawn(move || {
    me.watch_socket_read_in(skt);
});
println!("---> watch_socket_read_in");

self.watch_socket_write_out(socket.clone());
println!("---> watch_socket_write_out");

读取方法在一个新起的线程中watch_socket_read_in,如下

fn watch_socket_read_in(&mut self, socket: Arc<Mutex<WebSocket<MaybeTlsStream<TcpStream>>>>) {
    loop {
        let mut skt = socket.lock().unwrap();
        let msg = skt.read_message().unwrap();
        println!("Socket Received: {:?}", msg);
        drop(skt);
        self.tx_in
            .send(msg.clone())
            .expect("send msg into in channel failed");
        println!("send pipe in succ: {:?}", msg);
    }
}

可以看到,不停的从socket读取数据,读取前锁,读取后drop锁。

写入方法在初始化代码所在的主线程中watch_socket_write_out,如下

fn watch_socket_write_out(&mut self, socket: Arc<Mutex<WebSocket<MaybeTlsStream<TcpStream>>>>) {
    let rx = self.rx_out.lock().expect("lock rx out failed");

    for msg in rx.iter() {
        println!("msg from cli -> {:?}", msg);
        let mut skt = socket.lock().unwrap();
        println!("Socket Send    : {:?}", msg);
        skt.write_message(msg).unwrap();
        drop(skt);
    }

    println!("out of socket write out block....")
}

可是,运行的结果却出乎我的意料,运行结果现象是这样的,先是只能够从socket读取到服务端的PING数据,而cli发出的数据经过channel读取出来之后,锁socket,准备发送,但是发现锁socket卡主死锁了,导致无法经socket发送,然后就卡了很久;但是过了一段时间,写socket获取的锁成功了,发了一大堆的数据,然后又轮到读socket卡主,稍后随机的时间后,读socket锁成功,又只能读到PING,如此反复。这种状态的读写,完全不能用,根本实现不了cli与服务端的实时通讯。

分析了一下,应该是socket网络读写是网络通讯,因此读写的锁定socket时长是不确定的且相对耗时算是比较长的,所以导致无法预料是读获取到锁还是写获取到锁,而且这种锁强行将读写串行化了,完全不符合并发读写的要求了。

几经查找,于是采用tokio-tungstenite这个crate替换了tungstenite,因为它可以将WebSocketStream通过split方法分隔为readerwriter,这样一来,读与写就分离开了,在不同的线程中无需对socket加锁。

let (ws_stream, response) =
    connect_async(Url::parse("wss://ws.postman-echo.com/raw").unwrap())
        .await
        .expect("msg");
let (ws_writer, ws_reader) = ws_stream.split();

这样一来,读socketws_reader,写socketws_writer

// socket read
let me = self.clone();
tokio::spawn(async move {
    let mut incoming = ws_reader.map(Result::unwrap);
    while let Some(msg) = incoming.next().await {
        if msg.is_text() {
            println!("Socket Received: {:?}", msg);
            me.tx_in.send(msg).expect("send msg into in channel failed");
        }
    }
});

// socket write
self.watch_socket_write_out(ws_writer).await;
async fn watch_socket_write_out(
    &mut self,
    mut writer: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
) {
    let rx = self.rx_out.lock().expect("lock rx out failed");

    for msg in rx.iter() {
        println!("Socket Send    : {:?}", msg.to_text().unwrap());
        writer.send(msg).await.unwrap();
    }

    println!("out of socket write out block....")
}

可以看到,新线程中读socket,主线程中写socketws_readermap方法后,可以在死循环中阻塞调用next()不断的读取socket中的信息。写socket则从channel中读取到数据,按常规的方法send即可。

r56j2avc

接入tokio-tungstenite解决了这个问题,不过它是基于tokio的,tokio是一个协程库,有自己的运行时,用了tokio的程序起协程后,程序会自动启动若干个线程,类比goroutine,它也是有初始的资源消耗的,比如这个程序只需要4个线程,但是使用了tokio的程序,会有10个线程(如上图),内存占用会明显增多。


初学rust,HashMap的clone
Tag rust, clone, hashmap, on by view 249

在rust中有个常用个方法clone,按字面意思就是克隆。这个函数的作用是对对象进行深度拷贝,生成的新对象与原对象相互独立。

很多常用的类型或者容器类型都支持clone,例如rust中的HashMap也支持clone,我们用一段代码实验一下。

#[test]
fn test_hash_map_clone() {
    let xx: Arc<Mutex<HashMap<String, String>>> = Arc::new(Mutex::new(HashMap::new()));
    let mut mp = xx.lock().unwrap();
    mp.insert("hi".to_string(), "hello".to_string());
    println!("origin: {:?}", mp);
    let mut cp = mp.clone();
    cp.insert("k".to_string(), "v".to_string());
    println!("origin: {:?}", mp);
    println!("cp    : {:?}", cp);
}

输出

running 1 test
origin: {"hi": "hello"}
origin: {"hi": "hello"}
cp    : {"hi": "hello", "k": "v"}
test test_hash_map_clone ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 2 filtered out; finished in 0.00s

上面的测试代码运行结果表示,修改克隆后的对象cp,源对象mp不会发生变化。

那么我们自己定义的类型如何才能支持clone呢?使用#[derive(Clone)]这个指令修饰自定义类型,就会自动支持clone,但是要注意,如果自定义类型结构体里,如果有字段类型不支持clone,将无法通过#[derive(Clone)]指令快速支持clone

自定义类型clone测试如下

#[derive(Debug, Clone)]
struct User {
    name: String,
    age: i32,
}

#[test]
fn test_struct_clone() {
    let mut u1 = User {
        name: "rex".to_string(),
        age: 1,
    };
    println!("origin: {:?}", u1);
    let mut ucp = u1.clone();
    ucp.name = "agnes".to_string();
    ucp.age = 2;
    println!("origin: {:?}", u1);
    println!("cp    : {:?}", ucp);
    u1.age = 3;
    println!("origin: {:?}", u1);
    println!("cp    : {:?}", ucp);
}

运行结果

running 1 test
origin: User { name: "rex", age: 1 }
origin: User { name: "rex", age: 1 }
cp    : User { name: "agnes", age: 2 }
origin: User { name: "rex", age: 3 }
cp    : User { name: "agnes", age: 2 }
test test_struct_clone ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 2 filtered out; finished in 0.00s

同样可以看到,修改clone后的对象,源对象不变,修改源对象,clone后的对象也不变。