最近监控上报的agent里需要将数据上报到es,所以用了elasticsearch-rs这个包,这是es官方提供的rust版本sdk,看了一下版本号,目前的版本都处于alpha。
下面用一个简单实例讲述我遇到的问题。首先是,调用sdk发现需要用async标注函数。
use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;
fn main() {
println!("Hello, world!");
send_es("hi".to_string());
println!("hi");
}
pub async fn send_es(body: String) {
let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
let client = Elasticsearch::new(transport);
let mut bodies: Vec<String> = Vec::with_capacity(1);
bodies.push(body);
let response = client
.bulk(BulkParts::Index("nginx"))
.body(bodies)
.send()
.await
.unwrap();
let response_body = response.json::<Value>().await.unwrap();
println!("{:?}", response_body);
}
运行后,发现有一个警告,并且send_es
没有被调用
➜ tes git:(master) ✗ cargo run
Compiling tes v0.1.0 (/root/code/las/tes)
warning: unused implementer of `Future` that must be used
--> src/main.rs:6:5
|
6 | send_es("hi".to_string());
| ^^^^^^^^^^^^^^^^^^^^^^^^^^
|
= note: `#[warn(unused_must_use)]` on by default
= note: futures do nothing unless you `.await` or poll them
warning: `tes` (bin "tes") generated 1 warning
Finished dev [unoptimized + debuginfo] target(s) in 3.16s
Running `target/debug/tes`
Hello, world!
hi
警告说futures do nothing
,也就是send_es
将会什么也不做。后面,我找到了一个方法block_on
可以执行async方法,于是,变成了这样
use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use futures::executor::block_on;
use serde_json::Value;
fn main() {
println!("Hello, world!");
block_on(send_es("hi".to_string()));
println!("hi");
}
pub async fn send_es(body: String) {
let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
let client = Elasticsearch::new(transport);
let mut bodies: Vec<String> = Vec::with_capacity(1);
bodies.push(body);
let response = client
.bulk(BulkParts::Index("nginx"))
.body(bodies)
.send()
.await
.unwrap();
let response_body = response.json::<Value>().await.unwrap();
println!("{:?}", response_body);
}
但是执行后发现报错如下
➜ tes git:(master) ✗ cargo run
Compiling tes v0.1.0 (/root/code/las/tes)
Finished dev [unoptimized + debuginfo] target(s) in 3.52s
Running `target/debug/tes`
Hello, world!
thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /root/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/client/connect/dns.rs:121:24
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
各种查找解决方案之后,也没能解决这个问题,说是tokio
版本依赖的问题,两个不同的组件间接引用了不同版本的tokio
,说是引入tokio = "*"
就能解决依赖问题,但是实际上是无法解决的。所以我就用了上面的最小用例来调用elasticsearch sdk,只调用sdk,不引用任何其他依赖(原项目中还引用了reqwest包,这个包依赖了tokio)。发现这个最小用例也报错,说明根本不是依赖问题,但是可以确定问题出在tokio
上。于是阅读了tokio
的官方文档,了解到运行async函数可以用#[tokio::main]
标注,结合.await
就可以了。于是重新修改后如下
use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;
#[tokio::main]
async fn main() {
println!("Hello, world!");
send_es("hi".to_string()).await;
println!("hi");
}
pub async fn send_es(body: String) {
let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
let client = Elasticsearch::new(transport);
let mut bodies: Vec<String> = Vec::with_capacity(1);
bodies.push(body);
let response = client
.bulk(BulkParts::Index("nginx"))
.body(bodies)
.send()
.await
.unwrap();
let response_body = response.json::<Value>().await.unwrap();
println!("{:?}", response_body);
}
问题解决,终于调用成功了。
总结,tokio
是rust中有名的异步调用的包。它定义了async
和await
这些关键词,而实现异步。同样他也定义了异步函数的调用方式,就是#[tokio::main]
标注。