踩坑 rust 的 partial copy 导致 metrics 丢失

352 天前
 hezijiangjiang

在 RisingWave 的存储代码中,我们使用 RAII [1] 的思想来对 LSM iterator 的 metrics 进行监控,从而避免在代码中忘记上报 metrics , 导致 metrics 丢失。在实现中,我们实现了一个 rust 的 struct MonitoredStateStoreIterStats 去收集 LSM iterator 的 metrics ,去统计 iterator 中 key 的数量和长度,并为这个 struct 实现了 Drop ,在这个 struct 被释放的时候把在本地收集的 metrics 上报 prometheus 。通过这种方式,我们不需要在每次在 iterator 使用完后都手动上报 metrics ,从而避免了由于代码的疏忽导致忘记上报 metrics 。

以下是一段简化过的代码。我们通过 try_stream[2] 这个宏来封装一个 iterator 的 stream 来收集这个 stream 的 metrics 。在返回的 stream 被释放时,stats 随着 stream 被释放,并调用其 drop 方法来上报收集到的 metrics 。

pub struct MonitoredStateStoreIter<S> {
    inner: S,
    stats: MonitoredStateStoreIterStats,
}

struct MonitoredStateStoreIterStats {
    total_items: usize,
    total_size: usize,
    storage_metrics: Arc<MonitoredStorageMetrics>,
}

impl<S: StateStoreIterItemStream> MonitoredStateStoreIter<S> {
    #[try_stream(ok = StateStoreIterItem, error = StorageError)]
    async fn into_stream_inner(mut self) {
        let inner = self.inner;
        futures::pin_mut!(inner);
        while let Some((key, value)) = inner
            .try_next()
            .await
            .inspect_err(|e| error!("Failed in next: {:?}", e))?
        {
            self.stats.total_items += 1;
            self.stats.total_size += key.encoded_len() + value.len();
            yield (key, value);
        }
    }
}

impl Drop for MonitoredStateStoreIterStats {
    fn drop(&mut self) {
        self.storage_metrics
            .iter_item
            .observe(self.total_items as f64);
        self.storage_metrics
            .iter_size
            .observe(self.total_size as f64);
    }
}

然而,在使用过程中,我们遇到了上报的 metrics 全部为 0 的问题。

最小复现

由于使用了 try_stream 宏来生成 stream ,因此我们怀疑在 try_stream 生成的代码中有 bug 导致 metrics 丢失。于是我们用 cargo-expand [3] 来将查看宏生成的代码。展开后的代码如下

fn into_stream_inner(
    mut self,
) -> impl Stream<Item = StorageResult<StateStoreIterItem>
> {
    ::futures_async_stream::__private::try_stream::from_generator(static move |
        mut __task_context: ::futures_async_stream::__private::future::ResumeTy,
    | -> ::futures_async_stream::__private::Result<(), StorageError> {
        let (): () = {
            let inner = self.inner;
            let mut inner = inner;
            #[allow(unused_mut)]
                let mut inner = unsafe {
                ::pin_utils::core_reexport::pin::Pin::new_unchecked(
                    &mut inner,
                )
            };
            while let Some((key, value))
                = {
                let mut __pinned = inner.try_next();
                loop {
                    if let ::futures_async_stream::__private::Poll::Ready(
                        result,
                    ) = unsafe {
                        poll(Pin::as_mut(&mut __pinned), get_context(__task_context))
                    } {
                        break result;
                    }
                    __task_context = (yield ::futures_async_stream::__private::Poll::Pending);
                }
            }?
            {
                self.stats.total_items += 1;
                self.stats.total_size += key.encoded_len() + value.len();
                __task_context = (yield ::futures_async_stream::__private::Poll::Ready((
                    key,
                    value,
                )));
            }
        };
        #[allow(unreachable_code)]
        {
            return ::futures_async_stream::__private::Ok(());
            loop {
                __task_context = (yield ::futures_async_stream::__private::Poll::Pending);
            }
        }
    })
}

可以看到, try_stream 宏生成的代码中,包含了一个 rust generator 的闭包。闭包中收集和上报 metrics 的逻辑与原代码基本相同,按照我们对 rust 的理解,仍然不应该会出现 metrics 丢失的问题。因此我们怀疑是 rust 编译器中与 generator 相关的逻辑存在问题。在 rust playground 上,我们尝试了以下代码来对问题进行复现。

struct Stat {
    count: usize,
    vec: Vec<u8>,
}

impl Drop for Stat {
    fn drop(&mut self) {
        println!("count: {}", self.count);
    }
}

fn main() {

    let mut stat = Stat {
        count: 0,
        vec: Vec::new(),
    };

    let mut f = move || {
        stat.count += 1;
        1
    };

    println!("num: {}", f());
}

执行以后输出如下。

num: 1
count: 0

按照预期,输出的 num 和 count 应该都为 1 ,因为在调用闭包 f 时 stat.count += 1被调用了,但是以上代码中遇到了和最开始同样的问题。因此以上代码可以作为我们问题的一个最小复现。

问题分析

对以上代码进行分析的话,我们看到闭包 f 的代码中使用了 move ,因此在闭包中使用过的对象的 ownership 应该都会转移到闭包中。而 struct Stats 实现了 Drop,因此 Stats是不可以 partial move 的,其必须作为一个整体被 move 进入闭包。而在闭包中执行了 stats.count += 1,因此 stats 中的 count 应该被置为 1 。但是从程序的输出可以看到在 stats 被 drop 时,stats 的 count 是 0 。

我们尝试将闭包 f 改为如下代码,来显式的将 stats 的 ownership 给 move 进闭包里。

let mut f = move || {
    let mut stat = stat;
    stat.count += 1;
    1
};

输出恢复正常。

num: 1
count: 1

我们再次尝试在闭包 f 中使用 stat 中的另一个字段 vec

let mut f = move || {
    let _ = stat.vec.len();
    stat.count += 1;
    1
};

输出同样恢复正常。

num: 1
count: 1

可以看到,我们显式地将 stat 整个 move 进闭包,或者在闭包中使用类型为 vec 的字段,都会使得 stat 的 ownership 被 move 进闭包。

于是我们推测,尽管 stat 实现了自己的 drop 导致不能被 partial move ,但是如果我们在 move 的闭包中只使用了 stat 中实现了 Copy 类型的字段,则这个字段的值会被 Copy 到闭包中,而闭包中对这个字段的修改只会作用于被 Copy 后的值,而原字段并不会改变。

验证猜想

我们可以通过将以上代码编译成二进制代码后,对其汇编代码进行分析,从而验证我们的猜想。然而,编译后的汇编代码会过于复杂且晦涩难懂,同时编译器对其进行的一些优化也会改变原有的逻辑导致汇编代码难以理解。因此我们打算通过分析在编译过程中产生的 MIR 中间代码来对问题进行分析。在 rust playground 上可以十分方便地生成 MIR 代码。

首先我们对存在问题的最小复现代码生成 MIR ,生成后与闭包相关的 MIR 如下。可以看到这个闭包确实只包含了一个类型为 usize 的字段,这个字段的值取的是 stat 中的 count 值。

bb1: {
    _1 = Stat { count: const 0_usize, vec: move _2 };
    _3 = {closure@src/main.rs:19:17: 19:24} { stat: (_1.0: usize) };
}

而我们对后续测试中有正常输出的代码生成 MIR ,生成后与闭包相关的 MIR 如下。可以看到这个闭包将整个 stat 的 ownership 给 move 了进去。

bb1: {
		_1 = Stat { count: const 0_usize, vec: move _2 };
		_3 = {closure@src/main.rs:19:17: 19:24} { stat: move _1 };
}

于是,我们的猜想得到了验证,在我们出现问题的代码中,闭包确实没有捕获 stat 的 ownership 。

后续与总结

我们将这个问题向 rust 社区反映了这个问题,得到的反馈是,这个是 rust 2021 后实现的一个 feature [4]。在 rust 2021 中,一个使用了 move 的闭包在捕获一个 struct 的时候,会尽可能少地去捕获 struct 中的字段。

我们的代码中,正是因为这个行为,导致我们的代码产生了歧义,而出现了 metrics 的丢失。

针对这个问题,我们认为有两个地方有提升的空间。

最后,回到我们最开始的问题中。要想解决 metrics 丢失的问题,在我们的代码中,我们只需要做以下修改就能让代码正常运行 [6]。

#[try_stream(ok = StateStoreIterItem, error = StorageError)]
async fn into_stream_inner(mut self) {
    let inner = self.inner;
    ...
    self.stats.total_items += 1;
    self.stats.total_size += key.encoded_len() + value.len();
}

修改为

#[try_stream(ok = StateStoreIterItem, error = StorageError)]
async fn into_stream_inner(self) {
    let inner = self.inner;
    let mut stats = self.stats;
    ...
    stats.total_items += 1;
    stats.total_size += key.encoded_len() + value.len();
}

引用

[1] https://en.wikipedia.org/wiki/Resource_acquisition_is_initialization

[2] https://docs.rs/futures-async-stream/latest/futures_async_stream/index.html

[3] https://crates.io/crates/cargo-expand

[4] https://doc.rust-lang.org/edition-guide/rust-2021/disjoint-capture-in-closures.html

[5] https://github.com/rust-lang/rust/issues/108808

[6] https://github.com/risingwavelabs/risingwave/pull/8372

823 次点击
所在节点    Rust
3 条回复
lance6716
352 天前
一个月前同样因为 raii+move+神秘作用域踩了一个 bug😂这种时候我又支持大道至简了,可以给人自信不会让人怀疑人生
simen513
352 天前
rust 中类型为 copy 的语义导致赋值的含义与 c/c++是一样的,所以导致了该例子中传给闭包的参数,相当于 c/c++语言中函数参数的传值、传指针/引用的区别:move 相当于传值,没有 move 的相当于传引用。
RTSmile
327 天前
@lance6716 这个纯粹是对语言特性不了解导致的,和某个语言无关。
另外这种非常不明显的问题 go 又不是没有,for range 的循环变量问题坑了多少人。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1005907

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX