请教一个 Flink SQL 的问题,解决了星巴克感谢

2023-06-12 11:17:41 +08:00
 kerie

我是一个 Flink 小白,最近有一个监控需求,想使用 Flink SQL 实现,但很多概念还没搞清楚,遇到一个问题卡壳了,在论坛里寻 Flink 大佬指点一二,解决了送一杯星巴克作为感谢!

Flink SQL 官网用客户(customer)和订单(order)举例,但都每分钟统计流表每个客户订单的数量。我的需求是每分钟统计维表全量每个客户订单的数量,也就是就算这一分钟某个客户没有下单,也需要统计一个 0 出来,用于做监控报警。

为了不暴露业务需求,调整为客户和订单的场景,如果有不恰当的地方还请指出,我再补充,SQL 如下:

CREATE TEMPORARY TABLE customers (
    id INT,
    name STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://....'
);

CREATE TEMPORARY TABLE orders (
    order_id     STRING,
    customer_id  INT,
    order_time   TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = '...'
);

CREATE TEMPORARY VIEW order_per_minute AS
SELECT
    customer_id,
    count(*) as cnt,
    TUMBLE_END(order_time, INTERVAL '1' MINUTE) AS window_end
FROM orders
GROUP BY customer_id, TUMBLE(tstamp, INTERVAL '1' MINUTE);

INSERT INTO destination
SELECT
    COALESCE(window_end, CURRENT_TIMESTAMP),
    customer_id,
    COALESCE(cnt, 0),
FROM
    customers LEFT JOIN order_per_minute
        ON customers.id = order_per_minute.customer_id;

实际执行上面的代码有问题,比如说有 3 个客户 c1/c2/c3 ,但只有 2 个客户 c1/c2 每分钟都下单, 第一次执行结果是对的:

10:01, c1, 19  
10:01, c2, 32  
10:01, c3, 0

随后每分钟的数据,就会少掉 c3 的结果:

10:02, c1, 18  
10:02, c2, 22 // c3 没有输出
10:03, c1, 18  
10:03, c2, 22 // c3 没有输出

我也不清楚 Flink SQL 能否这么用吗,还是得用 DataStream API 解决?请论坛的 Flink 大佬帮忙看一下,感谢!

2241 次点击
所在节点    问与答
32 条回复
kerie
2023-06-12 15:10:49 +08:00
@leonhao 谢谢,时序数据库虽然有物化视图功能,但时序数据库本身不能做监控报警,还是需要外部程序做定时查询,将聚合数据推送给监控报警系统。
weakbd
2023-06-12 15:19:14 +08:00
时间语义试着用 ProcessingTime,再结合滚动窗口去实现你的操作
leonhao
2023-06-12 15:27:26 +08:00
@kerie grafana
kerie
2023-06-12 15:31:26 +08:00
@weakbd Flink 的时间属性和 join 语法理解有点困难,可以给个示例,我测试一把吗
fuyufjh
2023-06-12 15:33:54 +08:00
@kerie 我是觉得不应该用 Flink 做。Streaming 擅长增量计算,这个问题本质是一个全量计算,“全量”指最近 1min 的全量数据。

我觉得用数仓、定时 1min 查一次是最合适的。

Flink 的理念是 event-driven ,反过来说,如果没有 event 则不该触发任何计算。举个简化的例子,如果这 1min 内没有任何订单,你应该也希望得到一个全部 count=0 的结果吧(而不是什么也不输出),那么这就与 event-driven 相违背了。
BiggerLonger
2023-06-12 16:18:35 +08:00
要不时时 time window ?
weakbd
2023-06-12 16:50:19 +08:00
#25 是的,仔细看了下 op 的需求,你不应该以你的维表为基准,以实现 count=0 的结果,在 flink sql 中我是没想到能实现的方式。不过 datastream api 更灵活是可以做到的,你可以尝试下试试使用 AggregateFunction 实现你的需求呢
kerie
2023-06-12 17:27:38 +08:00
@fuyufjh 感谢答复,你对 event-driven 的理解让我很有启发,单这个需求来看确实用数仓更合适一些,
@weakbd 也感谢你
kerie
2023-06-12 17:31:21 +08:00
感谢各位 xdjm 的热心答复,我最终决定采纳 @t3zb2xzvjm4yvmn 的方案,大佬留下 vx ,我转你一杯咖啡的钱
t3zb2xzvjm4yvmn
2023-06-12 18:08:37 +08:00
@kerie QnVubnk1NDE=
kerie
2023-06-12 18:21:16 +08:00
alwaysdazz
2023-06-13 05:19:39 +08:00
经典维表驱动流表的案例,之前遇到过该问题,用 Python udf 解决掉的。。。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/947922

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX