今天开始尝试使用rust的多线程编程,在不同的线程中访问共享变量。一个小项目,项目背景是这样的,我想用rust写一个高性能的nginx日志解析聚合上报监控的agent;其中,主线程中watch日志文件,并读取日志内容,有点类似tailf,然后有一个子线程,负责每隔10s钟统计主线程收集到的日志数据,聚合并上报。
所以,这里除了主线程之外,需要创建一个线程。并且主线程不断的往一个Vec<LineStat>
容器变量中写入数据,另一个子线程没隔10s,从这个容器变量中读取所有数据,并累加统计上报,然后清空该容器中的数据,10s一个往复。这个容器变量就是关键的共享变量了。常规情况下,是存在数据竞争的,在golang里倒是很容易写出来,但是golang可不管这些,锁你爱加不加,不加锁也能跑,但是会出错,若是map还会panic。但是,今天发现在rust里,根本写不出来……
那么第一个问题就是,rust线程中如何使用共享变量。 我们先看下直接使用共享变量会怎么样。
fn testx() {
let mut data = "hi".to_string();
thread::spawn(move || loop {
println!("{:?}", data);
thread::sleep(Duration::from_secs(10));
});
println!("{:?}", data);
}
可以看到spawn生成的线程内部访问data,会被要求加上move
关键词,它会强制获取被访问变量的所有权,也就是说在线程中访问了data
,data
的所有权就变为这个线程的了,只能在这个线程内访问。后续的println
访问data
就会报错。如下
error[E0382]: borrow of moved value: `data`
--> src/main.rs:38:22
|
31 | let mut data = "hi".to_string();
| -------- move occurs because `data` has type `std::string::String`, which does not implement the `Copy` trait
32 | thread::spawn(move || loop {
| ------- value moved into closure here
33 | println!("{:?}", data);
| ---- variable moved due to use in closure
...
38 | println!("{:?}", data);
| ^^^^ value borrowed here after move
|
= note: this error originates in the macro `$crate::format_args_nl` (in Nightly builds, run with -Z macro-backtrace for more info)
可以看到报错value borrowed here after move
,也就是访问了所有权被移动过的变量。
那么我们如何才能使data
能够同时在主线程和子线程中被访问呢。用Arc<Mutex<...>>
。代码改成如下
fn main() {
let mut raw_data = "hi".to_string();
let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
let mut clone_data = data.clone();
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(1));
println!("th -> {:?}", clone_data.lock().unwrap());
clone_data.lock().unwrap().push_str(", hello");
});
thread::sleep(Duration::from_secs(10));
println!("final -> {:?}", data.lock().unwrap());
}
可以看到要访问的数据套上Arc<Mutex<...>>
之后,在线程中访问该数据时,先在线程外clone一个副本,然后在线程中访问该数据,访问时先加锁lock()
,然后读取到的数据就是raw_data
,而在主线程中访问的数据也是先lock()
加锁,读取到的也是raw_data
。相当于在raw_data
外部加了个套,虽然data
与clone_data
是两个加了套的数据,但是在套里面的数据是同一个,也就是raw_data
,lock()
锁定“解套”后访问的是同一个数据。我们来看下运行结果
th -> "hi"
th -> "hi, hello"
th -> "hi, hello, hello"
th -> "hi, hello, hello, hello"
th -> "hi, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello, hello"
final -> "hi, hello, hello, hello, hello, hello, hello, hello, hello, hello"
可以看到线程内部字符串拼接了若干次,最后在主线程中打印出来的是子线程拼接后的数据。
那么rust是怎么解锁的呢。 我们把代码改成这样
fn main() {
let mut raw_data = "hi".to_string();
let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
let mut clone_data = data.clone();
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(1));
let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
println!("th -> {:?}", v);
appendCnt(clone_data.clone());
// 线程结束后,v 被回收,v所在的锁才会释放
});
thread::sleep(Duration::from_secs(10));
println!("final -> {:?}", data.lock().unwrap());
}
fn appendCnt(mut data: Arc<Mutex<String>>) {
data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,发现前面已经锁定过,无法加锁,死锁
}
执行结果显示打印了第一个th -> "hi"
之后,程序直接卡死。这里是因为发生了死锁。rust中目前没有显式的锁释放操作(实际上unlock方法在nightly版本中,参考,对应的issue),锁释放(unlock)发生在前面所说的“加了套的数据”内存释放的时候,也就是说clone_data.lock()
即v
解锁发生在v
释放的时候,也就是在loop所在块结束的时候。但是在这之前appenCnt
调用了clone_data.clone()
衍生的“加了套的数据”,并尝试在函数中加锁,这时候上一个锁还没释放呢,所以就死锁了。
那么如果这里我必需要提前解锁怎么办,用drop
即可。如下
fn main() {
let mut raw_data = "hi".to_string();
let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
let mut clone_data = data.clone();
thread::spawn(move || loop {
thread::sleep(Duration::from_secs(1));
let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
println!("th -> {:?}", v);
drop(v); // 提前释放 v,提前解锁
appendCnt(clone_data.clone());
});
thread::sleep(Duration::from_secs(10));
println!("final -> {:?}", data.lock().unwrap());
}
fn appendCnt(mut data: Arc<Mutex<String>>) {
data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,没有被锁定,可以加锁
}
运行,发现现在能正常解锁了。