最近发现一个问题,rust的线程安全机制导致无法实现socket读写分离到两个不同的线程。
先说一下程序的背景,程序是将本地终端pty(cli)拉起,并且将pty的输入输出通过channel对接,并将cli输出的数据经过channel写入到服务端socket,将从服务端socket收取到的数据经另一个channel写入到cli的输入。从而实现远程连接pty。
按照rust的写法,读线程中,在读socket之前需要先锁socket,然后读取,再释放锁;同样,在写线程中,也需要先锁socket,然后写入,再释放锁。这样一来代码应该如下:
连接与初始化代码如下
let (ws_stream, response) =
connect(Url::parse("wss://ws.postman-echo.com/raw").unwrap()).expect("msg");
println!("Connected to the server");
println!("Response HTTP code: {}", response.status());
println!("Response contains the following headers:");
for (ref header, _value) in response.headers() {
println!("* {}", header);
}
let socket = Arc::new(Mutex::new(ws_stream));
// init cli
self.pty_start();
let mut me = self.clone();
let skt = socket.clone();
thread::spawn(move || {
me.watch_socket_read_in(skt);
});
println!("---> watch_socket_read_in");
self.watch_socket_write_out(socket.clone());
println!("---> watch_socket_write_out");
读取方法在一个新起的线程中watch_socket_read_in
,如下
fn watch_socket_read_in(&mut self, socket: Arc<Mutex<WebSocket<MaybeTlsStream<TcpStream>>>>) {
loop {
let mut skt = socket.lock().unwrap();
let msg = skt.read_message().unwrap();
println!("Socket Received: {:?}", msg);
drop(skt);
self.tx_in
.send(msg.clone())
.expect("send msg into in channel failed");
println!("send pipe in succ: {:?}", msg);
}
}
可以看到,不停的从socket读取数据,读取前锁,读取后drop锁。
写入方法在初始化代码所在的主线程中watch_socket_write_out
,如下
fn watch_socket_write_out(&mut self, socket: Arc<Mutex<WebSocket<MaybeTlsStream<TcpStream>>>>) {
let rx = self.rx_out.lock().expect("lock rx out failed");
for msg in rx.iter() {
println!("msg from cli -> {:?}", msg);
let mut skt = socket.lock().unwrap();
println!("Socket Send : {:?}", msg);
skt.write_message(msg).unwrap();
drop(skt);
}
println!("out of socket write out block....")
}
可是,运行的结果却出乎我的意料,运行结果现象是这样的,先是只能够从socket读取到服务端的PING数据,而cli发出的数据经过channel读取出来之后,锁socket,准备发送,但是发现锁socket卡主死锁了,导致无法经socket发送,然后就卡了很久;但是过了一段时间,写socket获取的锁成功了,发了一大堆的数据,然后又轮到读socket卡主,稍后随机的时间后,读socket锁成功,又只能读到PING,如此反复。这种状态的读写,完全不能用,根本实现不了cli与服务端的实时通讯。
分析了一下,应该是socket网络读写是网络通讯,因此读写的锁定socket时长是不确定的且相对耗时算是比较长的,所以导致无法预料是读获取到锁还是写获取到锁,而且这种锁强行将读写串行化了,完全不符合并发读写的要求了。
几经查找,于是采用tokio-tungstenite
这个crate替换了tungstenite
,因为它可以将WebSocketStream
通过split
方法分隔为reader
和writer
,这样一来,读与写就分离开了,在不同的线程中无需对socket
加锁。
let (ws_stream, response) =
connect_async(Url::parse("wss://ws.postman-echo.com/raw").unwrap())
.await
.expect("msg");
let (ws_writer, ws_reader) = ws_stream.split();
这样一来,读socket
用ws_reader
,写socket
用ws_writer
。
// socket read
let me = self.clone();
tokio::spawn(async move {
let mut incoming = ws_reader.map(Result::unwrap);
while let Some(msg) = incoming.next().await {
if msg.is_text() {
println!("Socket Received: {:?}", msg);
me.tx_in.send(msg).expect("send msg into in channel failed");
}
}
});
// socket write
self.watch_socket_write_out(ws_writer).await;
async fn watch_socket_write_out(
&mut self,
mut writer: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
) {
let rx = self.rx_out.lock().expect("lock rx out failed");
for msg in rx.iter() {
println!("Socket Send : {:?}", msg.to_text().unwrap());
writer.send(msg).await.unwrap();
}
println!("out of socket write out block....")
}
可以看到,新线程中读socket
,主线程中写socket
;ws_reader
经map
方法后,可以在死循环中阻塞调用next()
不断的读取socket
中的信息。写socket
则从channel
中读取到数据,按常规的方法send
即可。
接入tokio-tungstenite
解决了这个问题,不过它是基于tokio
的,tokio
是一个协程库,有自己的运行时,用了tokio
的程序起协程后,程序会自动启动若干个线程,类比goroutine,它也是有初始的资源消耗的,比如这个程序只需要4个线程,但是使用了tokio的程序,会有10个线程(如上图),内存占用会明显增多。