2 种思路,
1 是使用滑动窗口, 窗口大小为 24h,
然后去重;
2 是记录一个上次 a 出现的时间, 如果大于 24h 就重新计算, 小于 24h 就跳过.
下面是方法 1 的代码:
ds
.windowAll(TumblingProcessingTimeWindows.of(Time.hours(24)))
.process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
private static final long serialVersionUID = 1L;
@
Override public void process(ProcessAllWindowFunction<String, String,
TimeWindow>.Context ctx,
Iterable<String> values, Collector<String> out) throws Exception {
boolean repeat = false;
for (String value : values) {
if (value.toLowerCase().indexOf("a") > -1) {
if (repeat) {
continue;
}
repeat = true;
}
out.collect(value);
}
}
})
.print();