请教一个 Flink SQL 的问题,解决了星巴克感谢

2023-06-12 11:17:41 +08:00
 kerie

我是一个 Flink 小白,最近有一个监控需求,想使用 Flink SQL 实现,但很多概念还没搞清楚,遇到一个问题卡壳了,在论坛里寻 Flink 大佬指点一二,解决了送一杯星巴克作为感谢!

Flink SQL 官网用客户(customer)和订单(order)举例,但都每分钟统计流表每个客户订单的数量。我的需求是每分钟统计维表全量每个客户订单的数量,也就是就算这一分钟某个客户没有下单,也需要统计一个 0 出来,用于做监控报警。

为了不暴露业务需求,调整为客户和订单的场景,如果有不恰当的地方还请指出,我再补充,SQL 如下:

CREATE TEMPORARY TABLE customers (
    id INT,
    name STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://....'
);

CREATE TEMPORARY TABLE orders (
    order_id     STRING,
    customer_id  INT,
    order_time   TIMESTAMP(3),
    WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = '...'
);

CREATE TEMPORARY VIEW order_per_minute AS
SELECT
    customer_id,
    count(*) as cnt,
    TUMBLE_END(order_time, INTERVAL '1' MINUTE) AS window_end
FROM orders
GROUP BY customer_id, TUMBLE(tstamp, INTERVAL '1' MINUTE);

INSERT INTO destination
SELECT
    COALESCE(window_end, CURRENT_TIMESTAMP),
    customer_id,
    COALESCE(cnt, 0),
FROM
    customers LEFT JOIN order_per_minute
        ON customers.id = order_per_minute.customer_id;

实际执行上面的代码有问题,比如说有 3 个客户 c1/c2/c3 ,但只有 2 个客户 c1/c2 每分钟都下单, 第一次执行结果是对的:

10:01, c1, 19  
10:01, c2, 32  
10:01, c3, 0

随后每分钟的数据,就会少掉 c3 的结果:

10:02, c1, 18  
10:02, c2, 22 // c3 没有输出
10:03, c1, 18  
10:03, c2, 22 // c3 没有输出

我也不清楚 Flink SQL 能否这么用吗,还是得用 DataStream API 解决?请论坛的 Flink 大佬帮忙看一下,感谢!

2241 次点击
所在节点    问与答
32 条回复
sijue
2023-06-12 11:47:53 +08:00
上面 left join 是 regural join ,没有数据可能是因为 customer 中 c3 数据在 10:02 和 10:03 没有记录,建议使用 lookup join
sijue
2023-06-12 11:49:36 +08:00
liprais
2023-06-12 11:51:21 +08:00
你应该用用户表左关联
kerie
2023-06-12 11:55:01 +08:00
@sijue 对的,上面的 left join 是 regular join ,没有数据是因为 customer 表没有时间戳信息,它只是一个全量客户的表,没有带时间戳信息。
lookup join 有上面一样的问题,不能输出没有 order 数据的 customer 统计。
kerie
2023-06-12 11:57:55 +08:00
@liprais 我用的 customers left join order_per_minute ,但不能统计出没有 order 的 customer
liprais
2023-06-12 11:58:23 +08:00
"If a table function call returns an empty result, the corresponding outer row is preserved, and the result padded with null values. Currently, a left outer join against a lateral table requires a TRUE literal in the ON clause."
kerie
2023-06-12 12:08:03 +08:00
@liprais 这里的`table function`指的是 udf (用户自定义函数)吗,不使用 udf 可以吗
sijue
2023-06-12 12:25:57 +08:00
1.有个治标的方式:维表初始化目前存量用户数据,订单数为 0 ,订单表一般是注册之后才能下单;
2.有个疑问点:有订单,用户表和订单表都会存数据,是否存在订单数据比用户数据迟到的点
t3zb2xzvjm4yvmn
2023-06-12 13:01:37 +08:00
实现过类似的需求,首先 lookup join 肯定是不行了,事实流中没有出现的用户肯定关联不到的。

比较彻底的解决方案是使用 datastream API ,process function 。还要状态编程,因为需要将维表的状态自己维护,比如把状态放到一个 tuple2 里,t0 是 customer id ,t1 给默认值 0 ,然后拿另外一个流的 element ,每来一条就给 t1+1 ,窗口触发时把所有的 tuple2 向下游发送。

需要额外考虑的一点是,有可能某个时间窗口内 1 个下单的都没有(比如半夜),那么该窗口无法触发,没有任何输出,所以默认的滚动窗口\事件时间语义就不太行。还需要再实现一个窗口,事件时间、处理时间混合语义,保证即使没有事件仍然可以触发,输出所有用户下单数均为 0 的情况。

Flink SQL 没有实现过,但是有一个简单粗暴的想法,定时把维表的数据全量发到 flink ,构造出所有包含用户的事实流,这样你只需要改造 customers 维表那里就够了。
缺点是需要不断地读维表,对 MySQL 增加压力; source 端不断地向下游发维度信息,实际上不符合事件驱动和流式计算的原则。

我猜测这个需求数据量不大,实时性要求也没那么高,使用 spark streaming 可能是更好的选择。
kerie
2023-06-12 13:03:24 +08:00
@sijue 1. 不行,需求就是监控哪些用户没有下订单(这里的场景我做了更改,实际是监控哪些 IoT 设备没有上传数据,我在主题里也 append 更新下)
2. 用户表是提前全量初始化好的,不考虑更新问题,用户表没有时间信息;订单表,理论每个用户每分钟都有数据。
David1119
2023-06-12 13:22:19 +08:00
参考 9 楼的,process function 最方便,sql 做基础 etl 没问题,复杂一点的逻辑用 datastream 更灵活方便,存一下上一秒的 state ,做比较,可以判断类似连续 2 秒没数据然后报警推送,甚至连续 2s 没数据,但是 5s 内能上传上来就算正常这样的场景,随意发挥
kerie
2023-06-12 13:24:19 +08:00
@t3zb2xzvjm4yvmn 我感觉看到希望了,我的数据量不大,你知道如何粗暴的每分钟把维表的数据全量发到 flink ?
用 DataStream API ,你说的额外考虑的场景暂时不用考虑,我们是 IoT 设备,理论不会出现 1 分钟全部设备没有数据的情况,后续优化再额外每分钟加一些 mock 数据,强制触发窗口。
kerie
2023-06-12 13:28:49 +08:00
@David1119 我再挣扎下,新手感觉 Flink SQL 更直观一些,但如果实在做不了就放弃 SQL 的方案,转 DataStream API process function 。
fuyufjh
2023-06-12 13:42:19 +08:00
这是一个典型的 micro batch 需求吧,1 分钟执行一次。用时间条件做过滤能起到很好的过滤效果,执行很快的
kerie
2023-06-12 13:51:57 +08:00
@fuyufjh 大佬能讲一下用 Flink 具体怎么做吗
t3zb2xzvjm4yvmn
2023-06-12 14:16:55 +08:00
@kerie Flink SQL 好像没有现成的方法,可以自定义 source table ,你研究一下吧
或者不在 Flink 里做,在外部写一个 Java/Python 程序用 JDBC 和 kafka API ,定时把数据推到 kafka ,用 Flink SQL 接 kafka 就比较方便了。
kerie
2023-06-12 14:26:33 +08:00
@t3zb2xzvjm4yvmn 我研究一下,感谢!
leonhao
2023-06-12 14:28:29 +08:00
Flink SQL 无法实现,需要自己写 stream api 。如果可能出现这种情况,根本不需要 Flink ,把数据写到数据库在算就行
kerie
2023-06-12 14:43:41 +08:00
@leonhao 数据都写到数据库,还是得一个额外的程序,每分钟做一次计算吧
leonhao
2023-06-12 14:55:09 +08:00
@kerie 用 timescaledb 之类的时间序列数据库,使用物化视图,每分钟聚合一次,都是自带的功能,很 Flink 简单多了

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/947922

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX