V2ex 大佬云集,只能跑来这问下一个 Flink 的小问题?

2019-09-19 17:19:30 +08:00
 whirly

求助:Flink 1.9 sql 两个表 Join 后如何做 CEP ?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);

       // 构造订单数据
        DataStream<Order> ordersData = env.fromCollection(Arrays.asList(
                new Order("001", "iphone", new Timestamp(1545800002000L)),
                new Order("002", "mac", new Timestamp(1545800003000L)),
                new Order("003", "book", new Timestamp(1545800004000L)),
                new Order("004", "cup", new Timestamp(1545800018000L))
        ))
                .assignTimestampsAndWatermarks(new OrderTimestampExtractor());

        // 构造付款表
        DataStream<Payment> paymentData = env.fromCollection(Arrays.asList(
                new Payment("001", "alipay", new Timestamp(1545803501000L)),
                new Payment("002", "card", new Timestamp(1545803602000L)),
                new Payment("003", "card", new Timestamp(1545803610000L)),
                new Payment("004", "alipay", new Timestamp(1545803611000L))
        ))
                .assignTimestampsAndWatermarks(new PaymentTimestampExtractor());

        tEnv.registerDataStream("t_order", ordersData, "orderId, productName, orderTime");
        tEnv.registerDataStream("t_payment", paymentData, "orderId, payType, payTime");

        // 两表 JOIN
        String sqlQuery = "SELECT o.orderId as orderId, o.productName as productName, \n" +
                "p.payType as payType, o.orderTime as orderTime, \n" +
                "cast(payTime as timestamp) as payTime\n" +
                "FROM t_order AS o \n" +
                "JOIN t_payment AS p \n" +
                "ON o.orderId = p.orderId AND\n" +
                "\tp.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR";
        Table queryResult = tEnv.sqlQuery(sqlQuery);
        tEnv.registerTable("TemporalJoinResult", queryResult);


        String cepSQL = "select *, MATCH_ROWTIME() as rowtime from TemporalJoinResult\n" +
                "\tMATCH_RECOGNIZE (\n" +
                "\tORDER BY rowtime\n" +
                "      MEASURES\n" +
                "        A.orderId AS orderId,\n" +
                "        A.productName AS productName,\n" +
                "        A.orderTime AS orderTime,\n" +
                "\t\tB.payTime AS payTime\n" +
                "\tONE ROW PER MATCH \n" +
                "\tAFTER MATCH SKIP PAST LAST ROW\n" +
                "    PATTERN (A B)\n" +
                "    DEFINE\n" +
                "        A AS payType = 'alipay',\n" +
                "        B AS productName = 'iphone'\n" +
                "\t) as T";

        Table cepResult = tEnv.sqlQuery(cepSQL);
        tEnv.toAppendStream(cepResult, Row.class).print();

        env.execute();

MATCH_RECOGNIZE 里边 ORDER BY rowtime 不清楚怎样指定?求大佬帮忙

1502 次点击
所在节点    程序员
0 条回复

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/602246

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX