nginx upstream DNS解析问题
Tag nginx, upstream, dns, on by view 113

最近发现我香港服务器上放置的几个 web 站点经常会偶尔出现无法访问的情况,这个香港服务器上放置的是 trojan 加 nginx,流量从trojan进入,部分转发出去,另外部分是web站点的流量,转发到nginx,从而实现流量代理和web访问。

这个香港节点出现web访问异常,之前也遇到过几次,都是重启nginx就正常了。这次决定仔细看下是什么情况,登陆节点,首先查看trojan日志,发现在正常转发,再看下nginx的日志,如下

2022/11/14 01:27:11 [error] 22#22: *473474 upstream timed out (110: Connection timed out) while connecting to upstream, client: 61.177.173.46, server: 0.0.0.0:22, upstream: "36.36.106.166:23000", bytes from/to client:0/0, bytes from/to upstream:0/0
2022/11/14 01:27:27 [error] 22#22: *473486 upstream timed out (110: Connection timed out) while connecting to upstream, client: 203.205.141.115, server: 0.0.0.0:22, upstream: "36.36.106.166:23000", bytes from/to client:0/0, bytes from/to upstream:0/0
2022/11/14 01:28:36 [error] 22#22: *473490 upstream timed out (110: Connection timed out) while connecting to upstream, client: 203.205.141.115, server: 0.0.0.0:22, upstream: "36.36.106.166:23000", bytes from/to client:0/0, bytes from/to upstream:0/0
2022/11/14 01:29:19 [error] 22#22: *473492 upstream timed out (110: Connection timed out) while connecting to upstream, client: 61.177.173.52, server: 0.0.0.0:22, upstream: "36.36.106.166:23000", bytes from/to client:0/0, bytes from/to upstream:0/0

发现nginx日志显示有连接超时,于是我决定判断一下是否真的连接不上,telnet

➜  trojan git:(master) telnet 36.36.106.166 23000
Trying 36.36.106.166...
^C

果然连接不上,我web站点配置如下

server {
    listen 10110 ssl http2;
    server_name xxx.duguying.net;

    root /usr/share/nginx/html;
    index index.php index.html;
    ssl_certificate /data/certs/_.duguying.net.crt; 
    ssl_certificate_key /data/certs/_.duguying.net.key;
    ssl_stapling on;
    ssl_stapling_verify on;
    add_header Strict-Transport-Security "max-age=31536000";

    location / {
        include git.deny;
        proxy_set_header X-Forwarded-For $remote_addr;
        proxy_set_header Host            $http_host;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_pass http://jxx.duguying.net:****;
        proxy_buffering    off;
        proxy_buffer_size  128k;
        proxy_buffers 100  128k;
    }
}

web站点的流量经香港节点转发到 jxx.duguying.net ,dig一下

➜  ~ dig jxx.duguying.net

; <<>> DiG 9.11.5-P4-5.1+deb10u7-Debian <<>> jxx.duguying.net
;; global options: +cmd
;; Got answer:
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 25787
;; flags: qr rd ra; QUERY: 1, ANSWER: 1, AUTHORITY: 0, ADDITIONAL: 0

;; QUESTION SECTION:
;jxx.duguying.net.             IN      A

;; ANSWER SECTION:
jxx.duguying.net.      60      IN      A       222.248.21.219

;; Query time: 22 msec
;; SERVER: 192.168.1.1#53(192.168.1.1)
;; WHEN: 一 11月 14 11:17:24 CST 2022
;; MSG SIZE  rcvd: 51

发现nginx转发的节点居然不是目前upstream域名解析的节点,这说明我upstream域名dns更新了,但是nginx上upstream域名解析没更新。网上查询之后才知道,域名作为upstream,它的解析节点并不会时事更新。解决方案

方案一:每次dns有变化,重启Nginx (最开始出现故障重启nginx恢复就是这种解决方案)
方案二:使用Nginx Resolver
方案三:使用 Nginx-upstream-dynamic-server (nginx模块)
方案四:使用 ngx_upstream_jdomain (nginx模块)

这里介绍一下方案二,添加resolver相关配置,只需要将nginx配置改为如下

server {
    listen 10110 ssl http2;
    server_name xxx.duguying.net;

    resolver 127.0.0.1 valid=60s;    // 这里设置dns服务器
    resolver_timeout 3s;             // 这里设置dns解析超时时间

    root /usr/share/nginx/html;
    index index.php index.html;
    ssl_certificate /data/certs/_.duguying.net.crt; 
    ssl_certificate_key /data/certs/_.duguying.net.key;
    ssl_stapling on;
    ssl_stapling_verify on;
    add_header Strict-Transport-Security "max-age=31536000";

    location / {
        include git.deny;
        proxy_set_header X-Forwarded-For $remote_addr;
        proxy_set_header Host            $http_host;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_pass http://jxx.duguying.net:****;
        proxy_buffering    off;
        proxy_buffer_size  128k;
        proxy_buffers 100  128k;
    }
}

初学rust,优雅的解包Option
Tag rust, option, on by view 351

rust中,Option表示一个可能不存在值的复合类型。其定义如下

pub enum Option<T> {
    None,
    Some(T),
}

可以看到,它里面的值要么就是类型T的值,要么就是None(也表示不存在值)。通常,我们在获取到可能为空的值时,Option类型很有用,它要求你必须去处理可能为None的情况。它有方法 is_none, is_some 等,可以判定是否为空。但是如果你使用这两个方法来解包 Option,免不了if else判断,代码会比较难看。比如

let x: Option<u32> = Some(2);
let mut value: u32 = 0;
if x.is_none() {
    value = default;
} else {
    value = x.unwrap(); // unwrap 方法可以解一切包,但是遇到 None 会 panic
}

可以看到上面的代码用了2条语句,首先是初始化value值,然后判定是否为None,根据不同的情况,对value重新赋值,明显复杂,我解个Option还得分两步,而且申明的value还必需是mut类型。那么,对于“我需要解包Option,如果Option为None则给默认值”这个需求,有更优雅的写法吗?有,如下

let x: Option<u32> = Some(2);

// 方法一:利用 match,和语句块
let value = match x {
    Some(val) => val,
    None => default,
};

// 方法二:利用 if-let,和语句块
let value = {
    if let Some(val) = x {
        *val
    } else {
        default
    }
};

上述两种方法都是借助语句块一步到位的解包Option,并且没有调用任何方法。


给iPhone XS更换电池
Tag iphone, 电池, on by view 623

最近手机电池不耐用了,而且经常发生剩余电量20%就自动关机,充电到80%就充不进的情况。于是决定换一下电池。

关于给iPhone换电池我之前也干过,之前给iPhone SE换电池,按照网上的教程拉电池胶,结果一再拉断,最后只能掰电池,撬电池,弄下来之后电池已经不成样子了。后面给macbook也换过电池,macbook电池是因为使用时间太久导致电池鼓包,电池损耗到太高提醒电池维修,拆卸过程中遇到的同样是电池胶的问题,不得不暴力撕下电池。后来听说电子洗涤剂很容易除电池胶,正好之前玩flash芯片买了一瓶电子洗涤剂,试了一下,真香。电子洗涤剂从缝里一喷,原本难以撕开的电池,自动的轻松分离开了。

3qnjp5vj

备注:这个手机是二换电池了,5月份买了块品胜电池,用了5个月就不行了,看样子品胜电池次品率还挺高。这次换个德赛电池试一下。

对于换电池的小伙伴,这里强烈推荐电子洗涤剂来除胶,根本用不着像教学视频里一点点的卷胶,洗涤剂会让你轻而易举的就能拆卸下电池。还有一点需要注意,据说电子洗涤剂有毒,请妥善保存。


重拾单反,体验微距拍摄
Tag 单反, 微距, on by view 272

早年买了Nikon D7200,放了很久没怎么用,上次去中山带上了单反,感觉拍照效果很好。这一次学会了手动对焦。于是买了个近摄镜,想体验一下微距摄影,拍一下花花草草和昆虫蚂蚁的细节。

zz4mo02x

6rh4rexs

w5u4lg1m

e2sezosf

上面这些照片都是配合近摄镜手动对焦拍摄的。


初学rust,wasm前端图片转码
Tag wasm, rust, 转码, on by view 379

最近用rust写的日志上报agent趋近完善,意味着一个练习rust的小项目结束了。于是便找了个新的小项目,用rust代码编译出wasm,在浏览器端实现图片缩放、转码。决定做前端转码是出于两方面原因,第一是想体验一下rust-webassembly,第二是博客的管理后台上传图片能力有待优化,无法直接上传单反拍出来的图片,因为单反照都是十几兆以上大小,我的云服务器只有1M带宽,上传超时,就算我能忍受超时,也无法忍受大文件后端转码压缩时io满负载直接卡死服务器的情况。于是便有了这次wasm体验。

首先,如果你已经入门了rust,能用rust写代码了,那么用rust实现wasm将会是一种非常好的体验。因为rust的wasm全套工具齐全,你可以直接在rust项目中编译出npm包,编译出来的结果可以直接上传到npm仓库。这里简单介绍一下基于rust的wasm包开发过程。

首先创建rust的包项目,注意不是可执行文件。

cargo new wtools --lib

然后,修改Cargo.toml文件,定义包类型

[package]
name = "wtools"
version = "0.1.6"
edition = "2021"
description = "wasm tools"
license = "MIT"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
crate-type = ["cdylib", "rlib"] # cdylib 是wasm库, rlib 是常规rust库

[profile.release]
lto = true
opt-level = 'z'

[dependencies]

注意lib下的crate-type字段要定义为cdylib,只有这种包才能编译为wasm,然后还有一个选项需要注意profile.release下的lto=trueopt-level = 'z'这两个选项设置后,可以在编译的时候讲wasm压缩到最小大小,以减小wasm文件在网络中分发的大小。当然,缩减wasm还有个工具,叫做wasm-opt,但是我具体实测之后发现,只要设置了上面的ltoopt-level选项,这个工具能缩减的大小非常有限,有时候甚至无法再进一步缩减了。

安装工具。这里编译wasm报并不是用原生的cargo,而是使用一个叫做wasm-pack的工具,它的优点是,可以直接编译出npm包。安装

cargo install wasm-pack

编译

wasm-pack build --scope duguying

上传npm包

cd pkg
npm publish --access=public

整个开发的过程就是如上的那些。下面简单介绍一下代码。首先,我们这个rust项目的目标是编译为wasm在浏览器上运行。这里就免不了js与rust之间进行数据传递,以及rust里操作浏览器中的各种对象和元素。介绍两个rust包,第一个js-sys,用于js与rust之间进行数据传递的,在这个包里能找到js中的数据类型对应的类型;第二个web-sys,用于浏览器对象与rust之间进行数据传递的,在这个包里有对应浏览器中的各种对象。

比如,最常见的浏览器日志打印console.log,在web-sys中能找到console对象,详情可以查看文档。在我的rust包中简单的包装了一下

extern crate wasm_bindgen;
extern crate web_sys;

use wasm_bindgen::prelude::*;

#[macro_export]
macro_rules! console_log {
    ($($t:tt)*) => (web_sys::console::log(&js_sys::Array::of1(&JsValue::from(
        format_args!($($t)*).to_string()
    ))))
}

#[wasm_bindgen]
pub fn greet(name: &str) {
    console_log!("Hello, {}!", name);
}

这样就可以在其他地方用console_log来调用了,比如

console_log!("load img failed, err: {:?}", error);

我需要进行图片处理,所以用到了image这个包,这个包支持缩放图片resize、旋转图片rotate以及翻转图片flipv等。我主要用到缩放和旋转。另外有一点需要注意的是,需要导出到js的结构体和方法函数等,需要添加#[wasm_bindgen]注解。这个注解是在wasm_bindgen这个包中定义的,这个也是rust编译为wasm的核心包,具体可以查看文档。因为我发现单反上拍摄的照片通常会根据拍照者持相机的角度有一个旋转参数,而这个参数,它是存到了照片的exif信息中,但是他的照片数据实际存储是按照相机原始的方向存储的,所以,竖着拍摄的照片在上传到服务器之后会发现照片是横着的,需要旋转90度。所以在这里我还用到了kamadak-exif这个包,来读取照片的exif信息,从而获取旋转参数,然后根据旋转参数调用rotate对照片进行旋转来修正照片方向。图片处理的代码如下

extern crate wasm_bindgen;

use exif::{Error, Exif, In, Tag};
use image::{imageops::FilterType, DynamicImage, EncodableLayout, ImageFormat};
use js_sys::Uint8Array;
use std::io::{Cursor, Read, Seek, SeekFrom};
use wasm_bindgen::prelude::*;

use crate::console_log;

#[wasm_bindgen]
pub struct Img {
    img: DynamicImage,
    img_format: ImageFormat,
    exif: Result<Exif, Error>,
    orientation: u32,
}

#[wasm_bindgen]
impl Img {
    #[wasm_bindgen(constructor)]
    pub fn new(img: &[u8], mime: &str) -> Img {
        let exifreader = exif::Reader::new();
        let (img_data, img_format) = Img::load_image_from_array(img, mime.to_string());
        let mut c = Cursor::new(Vec::from(img));
        let exif = exifreader.read_from_container(&mut c);

        let mut image = Img {
            img: img_data,
            img_format: img_format,
            exif: exif,
            orientation: 0,
        };
        image.get_orietation();
        image.fix_orietation();
        image
    }

    fn load_image_from_array(_array: &[u8], mime: String) -> (DynamicImage, ImageFormat) {
        let img_format = ImageFormat::from_mime_type(mime).unwrap();
        let img = match image::load_from_memory_with_format(_array, img_format) {
            Ok(img) => img,
            Err(error) => {
                console_log!("load img failed, err: {:?}", error);
                panic!("{:?}", error)
            }
        };
        return (img, img_format);
    }

    fn get_orietation(&mut self) {
        match &self.exif {
            Ok(exif) => {
                let r = exif.get_field(Tag::Orientation, In::PRIMARY);
                match r {
                    Some(oriet) => {
                        self.orientation = oriet.value.get_uint(0).unwrap();
                    }
                    None => {}
                }
                console_log!("orientation: {:?}", r.unwrap());
            }
            Err(_error) => {}
        };
    }

    fn fix_orietation(&mut self) {
        match self.orientation {
            8 => self.img = self.img.rotate270(),
            3 => self.img = self.img.rotate180(),
            6 => self.img = self.img.rotate90(),
            _ => {}
        }
    }

    fn image_to_uint8_array(&self, img: DynamicImage) -> Uint8Array {
        // 创建一个内存空间
        let mut c = Cursor::new(Vec::new());
        match img.write_to(&mut c, self.img_format) {
            Ok(c) => c,
            Err(error) => {
                panic!(
                    "There was a problem writing the resulting buffer: {:?}",
                    error
                )
            }
        };
        c.seek(SeekFrom::Start(0)).unwrap();
        let mut out = Vec::new();
        // 从内存读取数据
        c.read_to_end(&mut out).unwrap();
        let v = out.as_bytes();
        Uint8Array::from(v)
    }

    pub fn get_width(&self) -> u32 {
        return self.img.width();
    }

    pub fn get_height(&self) -> u32 {
        return self.img.height();
    }

    pub fn grayscale(&self) -> Uint8Array {
        let img = self.img.grayscale();
        self.image_to_uint8_array(img)
    }

    pub fn scale(&self, width: u32, height: u32) -> Uint8Array {
        let img = self.img.resize(width, height, FilterType::Triangle);
        self.image_to_uint8_array(img)
    }

    pub fn rotate90(&self) -> Uint8Array {
        let img = self.img.rotate90();
        self.image_to_uint8_array(img)
    }

    pub fn rotate180(&self) -> Uint8Array {
        let img = self.img.rotate180();
        self.image_to_uint8_array(img)
    }

    pub fn rotate270(&self) -> Uint8Array {
        let img = self.img.rotate270();
        self.image_to_uint8_array(img)
    }

    pub fn flipv(&self) -> Uint8Array {
        let img = self.img.flipv();
        self.image_to_uint8_array(img)
    }

    pub fn fliph(&self) -> Uint8Array {
        let img = self.img.fliph();
        self.image_to_uint8_array(img)
    }
}

编译成功打包上传npm仓库之后,在前端项目中使用有一点需要注意,像这种基于wasm的npm包并不能像常规的npm包那样直接import引入,而是需要异步引入,这种写法非常不优雅,如下

/**
 * @description 全局注册md5工具
 */
async function waitwasm () {
  const { Crypt, Img } = await import('@duguying/wtools')
  Vue.prototype.$md5 = (content) => {
    let crypt = new Crypt()
    let out = crypt.md5(content)
    crypt.free()
    return out
  }
  Vue.prototype.$scale_img = (file) => {
    return new Promise(function (resolve, reject) {
      let reader = new FileReader()
      reader.readAsArrayBuffer(file)
      reader.onload = function () {
        let data = new Uint8Array(this.result)
        console.log('data:', data)
        let kit = new Img(data, file.type)
        console.log(kit)
        let w = kit.get_width()
        let h = kit.get_width()
        console.log('wh:', w, h)
        if (w > 2000) {
          w = 2000
          h = h / w * 2000
        } else {
          resolve(file)
          return
        }
        let out = kit.scale(w, h)
        resolve(new Blob([out.buffer], { type: file.type }))
      }
    })
  }
}
(async () => {
  waitwasm()
})()

他本身是一个异步引入,但是需要等它引入完毕之后,才能调用其中的方法,否则就会报错,所以,这里只好同步阻塞,等他引入完毕了。


动物园之旅
Tag 动物园, on by view 255

5号之前在深圳,主要逛了深圳动物园

ovmp23xu 动物园粽子

a9tc848z 动物园火烈鸟

5,6号在中山

v549tnop 孙中山老家

pn40sa8t 中山影视城

0tmilgih 云梯山

然后呢,回来之后,第二天开始,喜提三次黄码 😂


初学rust,给线程绑核
Tag 绑核, 线程, on by view 541

rust中,可以通过core_affinity这个crate对线程进行核绑定。但是绑核过程中发现一个问题。针对主线程绑核,若是主线程绑核后,创建子线程,在该子线程种尝试绑核会发现只有一个核可以。所以,使用这个库如果需要对主线程进行绑核,需要在所有子线程创建完毕之后进行。

绑核的函数如下

static CORE_IDS: Lazy<Vec<CoreId>> =
    Lazy::new(|| core_affinity::get_core_ids().unwrap()); // 尝试过给CORE_IDS 加锁,也是一样

fn bind_core(id: usize) {
    let mut selected_id = CORE_IDS[0];
    for core_id in CORE_IDS.clone() {
        println!("core {:?}, bind to: {:?}", selected_id, id);
        if core_id.id == id {
            selected_id = core_id;
            break;
        }
    }

    core_affinity::set_for_current(selected_id);
}

在主线程中绑核,且主线程的绑核在创建子线程之前,发现,第一次遍历核,核有8个,成功绑定到指定的7号核;第二次子线程中绑核,遍历核,核有1个,为7号核,所以只能绑定到7号核,这样就与主线程同核了。

core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]

print core_id: CoreId { id: 0 }, bind id: 7
print core_id: CoreId { id: 1 }, bind id: 7
print core_id: CoreId { id: 2 }, bind id: 7
print core_id: CoreId { id: 3 }, bind id: 7
print core_id: CoreId { id: 4 }, bind id: 7
print core_id: CoreId { id: 5 }, bind id: 7
print core_id: CoreId { id: 6 }, bind id: 7
print core_id: CoreId { id: 7 }, bind id: 7
core_id num: 1, cores: [CoreId { id: 7 }]

print core_id: CoreId { id: 7 }, bind id: 6

改为,主线程绑核在创建子线程之后,如下

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    let mut watcher = LogWatcher::register(filename.to_string()).unwrap();
    let poly: Poly = Poly::new(); // 此处调用会创建子线程
    config::affinity::bind_core_follow_config(0); // 绑核

    watcher.watch(&mut |line: String| {
        poly.clone().push(line);
        LogWatcherAction::None
    })
}

发现,成功的选中指定的主线程7号核,子线程6号核,输出如下

core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]

print core_id: CoreId { id: 0 }, bind id: 7
print core_id: CoreId { id: 1 }, bind id: 7
print core_id: CoreId { id: 2 }, bind id: 7
print core_id: CoreId { id: 3 }, bind id: 7
print core_id: CoreId { id: 4 }, bind id: 7
print core_id: CoreId { id: 5 }, bind id: 7
print core_id: CoreId { id: 6 }, bind id: 7
print core_id: CoreId { id: 7 }, bind id: 7
core_id num: 8, cores: [CoreId { id: 0 }, CoreId { id: 1 }, CoreId { id: 2 }, CoreId { id: 3 }, CoreId { id: 4 }, CoreId { id: 5 }, CoreId { id: 6 }, CoreId { id: 7 }]

print core_id: CoreId { id: 0 }, bind id: 6
print core_id: CoreId { id: 1 }, bind id: 6
print core_id: CoreId { id: 2 }, bind id: 6
print core_id: CoreId { id: 3 }, bind id: 6
print core_id: CoreId { id: 4 }, bind id: 6
print core_id: CoreId { id: 5 }, bind id: 6
print core_id: CoreId { id: 6 }, bind id: 6

其中两次选核过程中,也都能够正常打印出所有核。

那么我试一下主线程中先创建A线程,在A线程中绑核(7),然后在主线程中绑核(6),最后创建worker线程,在worker线程中绑核(5),代码如下

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    let mut watcher = LogWatcher::register(filename.to_string()).unwrap();

    thread::Builder::new()
        .name("A".into())
        .spawn(|| {
            config::affinity::bind_core_follow_config(2);
            loop {
                sleep(Duration::from_secs(1));
            }
        })
        .unwrap();

    config::affinity::bind_core_follow_config(0);
    sleep(Duration::from_secs(2));

    let poly: Poly = Poly::new(); // worker

    watcher.watch(&mut |line: String| {
        poly.clone().push(line);
        LogWatcherAction::None
    })
}

结果如下

core CoreId { id: 0 }, bind to: 7
core CoreId { id: 1 }, bind to: 7
core CoreId { id: 2 }, bind to: 7
core CoreId { id: 3 }, bind to: 7
core CoreId { id: 4 }, bind to: 7
core CoreId { id: 5 }, bind to: 7
core CoreId { id: 6 }, bind to: 7
core CoreId { id: 7 }, bind to: 7
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(1), name: Some("main"), .. }

core CoreId { id: 0 }, bind to: 5
core CoreId { id: 1 }, bind to: 5
core CoreId { id: 2 }, bind to: 5
core CoreId { id: 3 }, bind to: 5
core CoreId { id: 4 }, bind to: 5
core CoreId { id: 5 }, bind to: 5
>>> core CoreId { id: 5 }, bind to: Thread { id: ThreadId(2), name: Some("A"), .. }

core CoreId { id: 7 }, bind to: 6
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(3), name: Some("worker"), .. }

可以看到worker进程获取到的核数不正常。worker线程内部略微复杂,里面涉及到数据处理,tokio异步调用的上报等。可是我把worker线程换为简单的替换线程,却没有问题了。代码如下

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    let mut watcher = LogWatcher::register(filename.to_string()).unwrap();

    thread::Builder::new()
        .name("A".into())
        .spawn(|| {
            config::affinity::bind_core_follow_config(2);
            loop {
                sleep(Duration::from_secs(1));
            }
        })
        .unwrap();

    config::affinity::bind_core_follow_config(0);
    sleep(Duration::from_secs(2));

    // let poly: Poly = Poly::new(); // worker

    thread::Builder::new()
        .name("B worker".into())
        .spawn(|| {
            config::affinity::bind_core_follow_config(1);
            loop {
                sleep(Duration::from_secs(1));
            }
        })
        .unwrap();

    watcher.watch(&mut |line: String| {
        // poly.clone().push(line);
        LogWatcherAction::None
    })
}

其中B worker是替换worker的线程,结果如下

core CoreId { id: 0 }, bind to: 7
core CoreId { id: 1 }, bind to: 7
core CoreId { id: 2 }, bind to: 7
core CoreId { id: 3 }, bind to: 7
core CoreId { id: 4 }, bind to: 7
core CoreId { id: 5 }, bind to: 7
core CoreId { id: 6 }, bind to: 7
core CoreId { id: 7 }, bind to: 7
>>> core CoreId { id: 7 }, bind to: Thread { id: ThreadId(1), name: Some("main"), .. }

core CoreId { id: 0 }, bind to: 5
core CoreId { id: 1 }, bind to: 5
core CoreId { id: 2 }, bind to: 5
core CoreId { id: 3 }, bind to: 5
core CoreId { id: 4 }, bind to: 5
core CoreId { id: 5 }, bind to: 5
>>> core CoreId { id: 5 }, bind to: Thread { id: ThreadId(2), name: Some("A"), .. }

core CoreId { id: 0 }, bind to: 6
core CoreId { id: 1 }, bind to: 6
core CoreId { id: 2 }, bind to: 6
core CoreId { id: 3 }, bind to: 6
core CoreId { id: 4 }, bind to: 6
core CoreId { id: 5 }, bind to: 6
core CoreId { id: 6 }, bind to: 6
>>> core CoreId { id: 6 }, bind to: Thread { id: ThreadId(3), name: Some("B worker"), .. }

结论,core_affinity::get_core_ids获取到的核心信息具有不确定性。查阅作者仓库发现了一个issue与这个问题相关,提issue者同样反馈了这个问题,并质疑get_core_ids获取到的不是所有核,而是对于当前线程的可用核。


初学rust,tokio的async与await
Tag rust, tokio, async, await, on by view 487

最近监控上报的agent里需要将数据上报到es,所以用了elasticsearch-rs这个包,这是es官方提供的rust版本sdk,看了一下版本号,目前的版本都处于alpha。

下面用一个简单实例讲述我遇到的问题。首先是,调用sdk发现需要用async标注函数。

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;

fn main() {
    println!("Hello, world!");
    send_es("hi".to_string());
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

运行后,发现有一个警告,并且send_es没有被调用

➜  tes git:(master) ✗ cargo run
   Compiling tes v0.1.0 (/root/code/las/tes)
warning: unused implementer of `Future` that must be used
 --> src/main.rs:6:5
  |
6 |     send_es("hi".to_string());
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(unused_must_use)]` on by default
  = note: futures do nothing unless you `.await` or poll them

warning: `tes` (bin "tes") generated 1 warning
    Finished dev [unoptimized + debuginfo] target(s) in 3.16s
     Running `target/debug/tes`
Hello, world!
hi

警告说futures do nothing,也就是send_es将会什么也不做。后面,我找到了一个方法block_on可以执行async方法,于是,变成了这样

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use futures::executor::block_on;
use serde_json::Value;

fn main() {
    println!("Hello, world!");
    block_on(send_es("hi".to_string()));
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

但是执行后发现报错如下

➜  tes git:(master) ✗ cargo run
   Compiling tes v0.1.0 (/root/code/las/tes)
    Finished dev [unoptimized + debuginfo] target(s) in 3.52s
     Running `target/debug/tes`
Hello, world!
thread 'main' panicked at 'there is no reactor running, must be called from the context of a Tokio 1.x runtime', /root/.cargo/registry/src/github.com-1ecc6299db9ec823/hyper-0.14.20/src/client/connect/dns.rs:121:24
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

各种查找解决方案之后,也没能解决这个问题,说是tokio版本依赖的问题,两个不同的组件间接引用了不同版本的tokio,说是引入tokio = "*"就能解决依赖问题,但是实际上是无法解决的。所以我就用了上面的最小用例来调用elasticsearch sdk,只调用sdk,不引用任何其他依赖(原项目中还引用了reqwest包,这个包依赖了tokio)。发现这个最小用例也报错,说明根本不是依赖问题,但是可以确定问题出在tokio上。于是阅读了tokio官方文档,了解到运行async函数可以用#[tokio::main]标注,结合.await就可以了。于是重新修改后如下

use elasticsearch::{http::transport::Transport, BulkParts, Elasticsearch};
use serde_json::Value;

#[tokio::main]
async fn main() {
    println!("Hello, world!");
    send_es("hi".to_string()).await;
    println!("hi");
}

pub async fn send_es(body: String) {
    let transport = Transport::single_node("http://xxx.xxx.net").unwrap();
    let client = Elasticsearch::new(transport);
    let mut bodies: Vec<String> = Vec::with_capacity(1);
    bodies.push(body);
    let response = client
        .bulk(BulkParts::Index("nginx"))
        .body(bodies)
        .send()
        .await
        .unwrap();
    let response_body = response.json::<Value>().await.unwrap();
    println!("{:?}", response_body);
}

问题解决,终于调用成功了。

总结,tokio是rust中有名的异步调用的包。它定义了asyncawait这些关键词,而实现异步。同样他也定义了异步函数的调用方式,就是#[tokio::main]标注。


初学rust,多线程编程共享变量访问
Tag rust, 多线程, 共享变量, , 解锁, on by view 272

今天开始尝试使用rust的多线程编程,在不同的线程中访问共享变量。一个小项目,项目背景是这样的,我想用rust写一个高性能的nginx日志解析聚合上报监控的agent;其中,主线程中watch日志文件,并读取日志内容,有点类似tailf,然后有一个子线程,负责每隔10s钟统计主线程收集到的日志数据,聚合并上报。

所以,这里除了主线程之外,需要创建一个线程。并且主线程不断的往一个Vec<LineStat>容器变量中写入数据,另一个子线程没隔10s,从这个容器变量中读取所有数据,并累加统计上报,然后清空该容器中的数据,10s一个往复。这个容器变量就是关键的共享变量了。常规情况下,是存在数据竞争的,在golang里倒是很容易写出来,但是golang可不管这些,锁你爱加不加,不加锁也能跑,但是会出错,若是map还会panic。但是,今天发现在rust里,根本写不出来……

那么第一个问题就是,rust线程中如何使用共享变量。 我们先看下直接使用共享变量会怎么样。

fn testx() {
    let mut data = "hi".to_string();
    thread::spawn(move || loop {
        println!("{:?}", data);

        thread::sleep(Duration::from_secs(10));
    });

    println!("{:?}", data);
}

可以看到spawn生成的线程内部访问data,会被要求加上move关键词,它会强制获取被访问变量的所有权,也就是说在线程中访问了datadata的所有权就变为这个线程的了,只能在这个线程内访问。后续的println访问data就会报错。如下

error[E0382]: borrow of moved value: `data`
  --> src/main.rs:38:22
   |
31 |     let mut data = "hi".to_string();
   |         -------- move occurs because `data` has type `std::string::String`, which does not implement the `Copy` trait
32 |     thread::spawn(move || loop {
   |                   ------- value moved into closure here
33 |         println!("{:?}", data);
   |                          ---- variable moved due to use in closure
...
38 |     println!("{:?}", data);
   |                      ^^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` (in Nightly builds, run with -Z macro-backtrace for more info)

可以看到报错value borrowed here after move,也就是访问了所有权被移动过的变量。

那么我们如何才能使data能够同时在主线程和子线程中被访问呢。用Arc<Mutex<...>>。代码改成如下

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        println!("th -> {:?}", clone_data.lock().unwrap());
        clone_data.lock().unwrap().push_str(", hello");
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

可以看到要访问的数据套上Arc<Mutex<...>>之后,在线程中访问该数据时,先在线程外clone一个副本,然后在线程中访问该数据,访问时先加锁lock(),然后读取到的数据就是raw_data,而在主线程中访问的数据也是先lock()加锁,读取到的也是raw_data。相当于在raw_data外部加了个套,虽然dataclone_data是两个加了套的数据,但是在套里面的数据是同一个,也就是raw_datalock()锁定“解套”后访问的是同一个数据。我们来看下运行结果

th -> "hi"
th -> "hi, hello"
th -> "hi, hello, hello"
th -> "hi, hello, hello, hello"
th -> "hi, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello"
th -> "hi, hello, hello, hello, hello, hello, hello, hello, hello"
final -> "hi, hello, hello, hello, hello, hello, hello, hello, hello, hello"

可以看到线程内部字符串拼接了若干次,最后在主线程中打印出来的是子线程拼接后的数据。

那么rust是怎么解锁的呢。 我们把代码改成这样

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
        println!("th -> {:?}", v);

        appendCnt(clone_data.clone());
        // 线程结束后,v 被回收,v所在的锁才会释放
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

fn appendCnt(mut data: Arc<Mutex<String>>) {
    data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,发现前面已经锁定过,无法加锁,死锁
}

执行结果显示打印了第一个th -> "hi"之后,程序直接卡死。这里是因为发生了死锁。rust中目前没有显式的锁释放操作(实际上unlock方法在nightly版本中,参考,对应的issue),锁释放(unlock)发生在前面所说的“加了套的数据”内存释放的时候,也就是说clone_data.lock()v解锁发生在v释放的时候,也就是在loop所在块结束的时候。但是在这之前appenCnt调用了clone_data.clone()衍生的“加了套的数据”,并尝试在函数中加锁,这时候上一个锁还没释放呢,所以就死锁了。

那么如果这里我必需要提前解锁怎么办,用drop即可。如下

fn main() {
    let mut raw_data = "hi".to_string();
    let mut data: Arc<Mutex<String>> = Arc::new(Mutex::new(raw_data));
    let mut clone_data = data.clone();
    thread::spawn(move || loop {
        thread::sleep(Duration::from_secs(1));
        let mut v = clone_data.lock().unwrap(); // v 不会立即释放,v 所在的锁也不会立即释放
        println!("th -> {:?}", v);
        drop(v); // 提前释放 v,提前解锁

        appendCnt(clone_data.clone());
    });

    thread::sleep(Duration::from_secs(10));
    println!("final -> {:?}", data.lock().unwrap());
}

fn appendCnt(mut data: Arc<Mutex<String>>) {
    data.lock().unwrap().push_str(", hello"); // 这里再次尝试锁 data 里的值,没有被锁定,可以加锁
}

运行,发现现在能正常解锁了。


初学rust,闭包引用外部变量
Tag rust, 闭包, on by view 300

今天又遇到一个新的问题,那就是在闭包函数中,无法引用闭包函数之外的变量。

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    // println!("{}",filename);
    // let mut p = poly::Poly::new();
    let mut s = String::from("_");
    let mut log_watcher = LogWatcher::register(filename.to_string()).unwrap();

    log_watcher.watch(|line: String| {
        let (_, token) = parse::lex::parser(&line);
        // p.push(LineStat {});
        print!("{}", s);
        process::report::send(token);
    });
}

上面代码中,watch传入参数为一个闭包函数,其中需要引用main函数中的局部变量s,但是会报错

error[E0308]: mismatched types
  --> src/main.rs:22:23
   |
22 |       log_watcher.watch(|line: String| {
   |  _______________________^
23 | |         let (_, token) = parse::lex::parser(&line);
24 | |         // p.push(LineStat {});
25 | |         print!("{}", s);
26 | |         process::report::send(token);
27 | |     });
   | |_____^ expected fn pointer, found closure
   |
   = note: expected fn pointer `fn(std::string::String)`
                 found closure `[closure@src/main.rs:22:23: 27:6]`
note: closures can only be coerced to `fn` types if they do not capture any variables

查看logwatcher源码,watch该版本(0.1.0)方法定义如下

pub fn watch(&mut self, callback: fn (line: String)) {
    ....
}

不过我后来发现logwatcher版本升级到(0.1.1)之后,该方法变为

pub fn watch<F: ?Sized>(&mut self, callback: &mut F)
where
    F: FnMut(String) -> LogWatcherAction,
{
    ....
}

如此可以正常调用,代码如下

fn main() {
    let cfg = config::cfg::get_config();
    let filename = cfg.las.as_ref().unwrap().access_log.as_ref().unwrap();
    let mut watcher = LogWatcher::register(filename.to_string()).unwrap();
    let mut poly: Poly = Poly::new();

    watcher.watch(&mut |line: String| {
        poly.clone().push(line);
        LogWatcherAction::None
    })
}

由此可见,闭包与函数还是有区别的。但是回调函数定义为Fn/FnMut类型就可以同时接受闭包和函数类型的回调参数。举例如下

let a = String::from("abc");

let mut x = || println!("x {}", a);

fn y() {
    println!("y")
}

fn wrap(c: &mut Fn()) {
    c()
}

wrap(&mut x); // pass a closure
wrap(&mut y); // pass a function

同样可以参考 stackoverflow 的这个提问