我是一个 Flink 小白,最近有一个监控需求,想使用 Flink SQL 实现,但很多概念还没搞清楚,遇到一个问题卡壳了,在论坛里寻 Flink 大佬指点一二,解决了送一杯星巴克作为感谢!
Flink SQL 官网用客户(customer)和订单(order)举例,但都每分钟统计流表每个客户订单的数量。我的需求是每分钟统计维表全量每个客户订单的数量,也就是就算这一分钟某个客户没有下单,也需要统计一个 0 出来,用于做监控报警。
为了不暴露业务需求,调整为客户和订单的场景,如果有不恰当的地方还请指出,我再补充,SQL 如下:
CREATE TEMPORARY TABLE customers (
id INT,
name STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://....'
);
CREATE TEMPORARY TABLE orders (
order_id STRING,
customer_id INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '15' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '...'
);
CREATE TEMPORARY VIEW order_per_minute AS
SELECT
customer_id,
count(*) as cnt,
TUMBLE_END(order_time, INTERVAL '1' MINUTE) AS window_end
FROM orders
GROUP BY customer_id, TUMBLE(tstamp, INTERVAL '1' MINUTE);
INSERT INTO destination
SELECT
COALESCE(window_end, CURRENT_TIMESTAMP),
customer_id,
COALESCE(cnt, 0),
FROM
customers LEFT JOIN order_per_minute
ON customers.id = order_per_minute.customer_id;
实际执行上面的代码有问题,比如说有 3 个客户 c1/c2/c3 ,但只有 2 个客户 c1/c2 每分钟都下单, 第一次执行结果是对的:
10:01, c1, 19
10:01, c2, 32
10:01, c3, 0
随后每分钟的数据,就会少掉 c3 的结果:
10:02, c1, 18
10:02, c2, 22 // c3 没有输出
10:03, c1, 18
10:03, c2, 22 // c3 没有输出
我也不清楚 Flink SQL 能否这么用吗,还是得用 DataStream API 解决?请论坛的 Flink 大佬帮忙看一下,感谢!
这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。
V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。
V2EX is a community of developers, designers and creative people.