flink--Window TVF【窗口化表值函数】讲解及源码示例

flink--Window TVF【窗口化表值函数】讲解及源码示例,第1张

flink--Window TVF【窗口化表值函数】讲解及源码示例 1. 什么是Window TVF
  • Windowing TVFs: Flink定义的多态表函数(简称PTF),用于将表中的元素划分到定义的窗口中

    • 窗口化TVFs替代了 legacy Grouped Window Functions。
    • windows TVFs更符合SQL标准,更强大的支持复杂的窗口计算,例如Window TopN, Window Join【在1.13.x中还不支持】。
    • Grouped Window Functions【分组窗口函数】只能支持窗口聚合。
  • PTF 【Polymorphic Table Functions】:

    • SQL 2016标准的一部分,是一种特殊的表函数,但是可以使用表作为参数。PTF是一个强大的特性,它可以改变表形状。因为PTF在语义上与表类似,所以它们的调用发生在SELECt语句的FROM子句中
    • PTF 可以处理在定义时没有声明行的类型的表,也可以生成一个在定义时声明了或者没有声明行的类型的结果表。多态表函数允许开发人员利用动态SQL创建强大而复杂的自定义函数。

注意:每个元素逻辑上可以属于多个窗口,这取决于使用的TVF。
例如HOP窗口创建重叠窗口,其中单个元素可以分配给多个窗口。

2. 内置的Window TVF

Flink提供了3个内置的窗口TVF: TUMBLE、HOP、CUMULATE
窗口TVF的返回值是一个新的关系,它包括

  • 原始关系的所有列
  • 用于表示所分配的窗口的3个列“window_start”、“window_end”、“window_time”
    • “window_time”字段是打开TVF后窗口的时间属性,可以在后续基于时间的 *** 作中使用
    • window_time的值总是等于window_end - 1ms
2.1. TUMBLE 2.1.1. 函数介绍

TUMBLE函数根据时间属性列为关系的每一行分配一个窗口
TUMBLE函数有三个必需的参数: TUMBLE(TABLE data, DEscriptOR(timecol), size)

  • data: 表参数,此表需要包含有一个时间属性列 【 time attribute column】
  • timecol: 一个列描述符,指示数据的哪个时间属性列应该映射到滚动的窗口
  • size: 指定滚动窗口的大小
2.1.2. 源码示例
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class TVFSQLExample {
    private static final Logger LOG = LoggerFactory.getLogger(TVFSQLExample.class);

    public static void main(String[] args) {
        EnvironmentSettings settings = null;
        StreamTableEnvironment tEnv = null;
        try {

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            settings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
            tEnv = StreamTableEnvironment.create(env, settings);

            final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
            DataStream dataStream =
                    env.fromElements(
                            new Bid(LocalDateTime.parse("2020-04-15 08:05",dateTimeFormatter), new BigDecimal("4.00"), "C"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:07",dateTimeFormatter), new BigDecimal("2.00"), "A"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:09",dateTimeFormatter), new BigDecimal("5.00"), "D"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:11",dateTimeFormatter), new BigDecimal("3.00"), "B"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:13",dateTimeFormatter), new BigDecimal("1.00"), "E"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:17",dateTimeFormatter), new BigDecimal("6.00"), "F"));

            Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
                    .column("bidtime", DataTypes.TIMESTAMP(3))
                    .column("price", DataTypes.DECIMAL(10,2))
                    .column("item", DataTypes.STRING())
                    .watermark("bidtime", "bidtime - INTERVAL '1' SECOND")
                    .build());
            table.printSchema();

            tEnv.createTemporaryView("Bid",table);

            String sql = "SELECt window_start, window_end, SUM(price) as price FROM TABLE(" +
                    "    TUMBLE(TABLE Bid, DEscriptOR(bidtime), INTERVAL '10' MINUTES))" +
                    "  GROUP BY window_start, window_end";
            TableResult res = tEnv.executeSql(sql);
            res.print();
            tEnv.dropTemporaryView("Bid");
        }catch (Exception e){
            LOG.error(e.getMessage(),e);
        }
    }

    //必须是静态类,且属性
    public static class Bid{
        public LocalDateTime bidtime;
        public BigDecimal price;
        public String item;

        public Bid() {}

        public Bid(LocalDateTime bidtime, BigDecimal price, String item) {
            this.bidtime = bidtime;
            this.price = price;
            this.item = item;
        }
    }
}

2.1.3. 输出结果
(
  `bidtime` TIMESTAMP(3) *ROWTIME*,
  `price` DECIMAL(10, 2),
  `item` STRING,
  WATERMARK FOR `bidtime`: TIMESTAMP(3) AS bidtime - INTERVAL '1' SECOND
)

+----+-------------------------+-------------------------+------------------------------------------+
| op |            window_start |              window_end |                                    price |
+----+-------------------------+-------------------------+------------------------------------------+
| +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 |                                    11.00 |
| +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 |                                    10.00 |
+----+-------------------------+-------------------------+------------------------------------------+
2.2. HOP 2.2.1. 函数介绍

Hopping windows 也被称为 “sliding windows”.
HOP函数分配的窗口覆盖大小间隔内的行,并根据时间属性列移动每个窗口【assigns windows that cover rows within the interval of size and shifting every slide based on a time attribute column】

HOP函数有三个必需的参数: HOP(TABLE data, DEscriptOR(timecol), slide, size [, offset ])

  • data: 表参数,此表需要包含有一个时间属性列 【 time attribute column】
  • timecol: 一个列描述符,指示数据的哪个时间属性列应该映射到滑动的窗口
  • slide: 指定顺序hopping 窗口开始之间的持续时间
  • size: 指定hopping 窗口宽度的持续时间,size必须是slide的整数倍。
2.2.2. 源码示例
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class TVFSQLExample {
    private static final Logger LOG = LoggerFactory.getLogger(TVFSQLExample.class);

    public static void main(String[] args) {
        EnvironmentSettings settings = null;
        StreamTableEnvironment tEnv = null;
        try {

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            settings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
            tEnv = StreamTableEnvironment.create(env, settings);

            final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
            DataStream dataStream =
                    env.fromElements(
                            new Bid(LocalDateTime.parse("2020-04-15 08:05",dateTimeFormatter), new BigDecimal("4.00"), "C"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:07",dateTimeFormatter), new BigDecimal("2.00"), "A"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:09",dateTimeFormatter), new BigDecimal("5.00"), "D"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:11",dateTimeFormatter), new BigDecimal("3.00"), "B"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:13",dateTimeFormatter), new BigDecimal("1.00"), "E"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:17",dateTimeFormatter), new BigDecimal("6.00"), "F"));

            Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
                    .column("bidtime", DataTypes.TIMESTAMP(3))
                    .column("price", DataTypes.DECIMAL(10,2))
                    .column("item", DataTypes.STRING())
                    .watermark("bidtime", "bidtime - INTERVAL '1' SECOND")
                    .build());
            table.printSchema();

            tEnv.createTemporaryView("Bid",table);

            String sql = "SELECt window_start, window_end, SUM(price) as price FROM TABLE(" +
                    "    HOP(TABLE Bid, DEscriptOR(bidtime), INTERVAL '3' MINUTES, INTERVAL '9' MINUTES))n" +
                    "  GROUP BY window_start, window_end";
            TableResult res = tEnv.executeSql(sql);
            res.print();
            tEnv.dropTemporaryView("Bid");
        }catch (Exception e){
            LOG.error(e.getMessage(),e);
        }
    }

    //必须是静态类,且属性
    public static class Bid{
        public LocalDateTime bidtime;
        public BigDecimal price;
        public String item;

        public Bid() {}

        public Bid(LocalDateTime bidtime, BigDecimal price, String item) {
            this.bidtime = bidtime;
            this.price = price;
            this.item = item;
        }
    }
}

2.2.3. 输出结果
+----+-------------------------+-------------------------+------------------------------------------+
| op |            window_start |              window_end |                                    price |
+----+-------------------------+-------------------------+------------------------------------------+
| +I | 2020-04-15 07:57:00.000 | 2020-04-15 08:06:00.000 |                                     4.00 |
| +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:09:00.000 |                                     6.00 |
| +I | 2020-04-15 08:03:00.000 | 2020-04-15 08:12:00.000 |                                    14.00 |
| +I | 2020-04-15 08:06:00.000 | 2020-04-15 08:15:00.000 |                                    11.00 |
| +I | 2020-04-15 08:09:00.000 | 2020-04-15 08:18:00.000 |                                    15.00 |
| +I | 2020-04-15 08:12:00.000 | 2020-04-15 08:21:00.000 |                                     7.00 |
| +I | 2020-04-15 08:15:00.000 | 2020-04-15 08:24:00.000 |                                     6.00 |
+----+-------------------------+-------------------------+------------------------------------------+
2.3. CUMULATE 2.3.1. 函数介绍

Cumulating windows【累积窗口】在某些场景中非常有用。
例如每日仪表板从00:00到每分钟绘制累积UV数,10:00的UV线代表从00:00到10:00的UV总数,这可以通过累积窗口轻松有效地实现

CUMULATE函数将元素分配给覆盖在初始步长间隔内的行,并将每一步扩展为多一个步长(保持window start固定),直到最大窗口大小。
你可以把cumulative函数看作应用TUMBLE窗口,首先使用最大窗口大小,然后将每个滚动窗口分割成几个具有相同窗口开始和窗口结束步长差异的窗口。因此,累积窗口确实是重叠的,而且没有固定的大小。

示例: 有一个1小时步长和1天最大大小的累积窗口,将得到每天的窗口:[00:00,01:00),[00:00,02:00),[00:00,03:00),…,[00:00,24:00)。

CUMULATE 函数有三个必需的参数: CUMULATE(TABLE data, DEscriptOR(timecol), step, size)

  • data: 表参数,此表需要包含有一个时间属性列 【 time attribute column】
  • timecol: 一个列描述符,指示数据的哪个时间属性列应该映射到滑动的窗口
  • step: 指定连续累积窗口结束之间增加的窗口大小的持续时间
  • size: 指定累积窗口的最大宽度的持续时间。大小必须是步长的整数倍
2.3.2. 源码示例
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class TVFSQLExample {
    private static final Logger LOG = LoggerFactory.getLogger(TVFSQLExample.class);

    public static void main(String[] args) {
        EnvironmentSettings settings = null;
        StreamTableEnvironment tEnv = null;
        try {

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            settings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
            tEnv = StreamTableEnvironment.create(env, settings);

            final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");
            DataStream dataStream =
                    env.fromElements(
                            new Bid(LocalDateTime.parse("2020-04-15 08:05",dateTimeFormatter), new BigDecimal("4.00"), "C"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:07",dateTimeFormatter), new BigDecimal("2.00"), "A"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:09",dateTimeFormatter), new BigDecimal("5.00"), "D"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:11",dateTimeFormatter), new BigDecimal("3.00"), "B"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:13",dateTimeFormatter), new BigDecimal("1.00"), "E"),
                            new Bid(LocalDateTime.parse("2020-04-15 08:17",dateTimeFormatter), new BigDecimal("6.00"), "F"));

            Table table = tEnv.fromDataStream(dataStream, Schema.newBuilder()
                    .column("bidtime", DataTypes.TIMESTAMP(3))
                    .column("price", DataTypes.DECIMAL(10,2))
                    .column("item", DataTypes.STRING())
                    .watermark("bidtime", "bidtime - INTERVAL '1' SECOND")
                    .build());
            table.printSchema();

            tEnv.createTemporaryView("Bid",table);

            String sql = "SELECt window_start, window_end, SUM(price) FROM TABLE(" +
                    "    CUMULATE(TABLE Bid, DEscriptOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))n" +
                    "  GROUP BY window_start, window_end";
            TableResult res = tEnv.executeSql(sql);
            res.print();
            tEnv.dropTemporaryView("Bid");
        }catch (Exception e){
            LOG.error(e.getMessage(),e);
        }
    }

    //必须是静态类,且属性
    public static class Bid{
        public LocalDateTime bidtime;
        public BigDecimal price;
        public String item;

        public Bid() {}

        public Bid(LocalDateTime bidtime, BigDecimal price, String item) {
            this.bidtime = bidtime;
            this.price = price;
            this.item = item;
        }
    }
}
2.3.3. 输出结果
| op |            window_start |              window_end |                                   EXPR |
+----+-------------------------+-------------------------+------------------------------------------+
| +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:06:00.000 |                                     4.00 |
| +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:08:00.000 |                                     6.00 |
| +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 |                                    11.00 |
| +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:12:00.000 |                                     3.00 |
| +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:14:00.000 |                                     4.00 |
| +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:16:00.000 |                                     4.00 |
| +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:18:00.000 |                                    10.00 |
| +I | 2020-04-15 08:10:00.000 | 2020-04-15 08:20:00.000 |                                    10.00 |

附录 参考

DataStream API Integration

如何定义Watermark

详见参见参考链接

tableEnv.fromDataStream(
        dataStream,
        Schema.newBuilder()
            .columnByexpression("rowtime", "CAST(event_time AS TIMESTAMP_LTZ(3))")
            .watermark("rowtime", "rowtime - INTERVAL '10' SECOND")
            .build());

Table table = tableEnv.fromDataStream(
    dataStream,
    Schema.newBuilder()
        .columnByexpression("proc_time", "PROCTIME()")
        .build());
行内容转换

基于editplus将输入的文本内容转换为JAVA代码

将如下行内容
| 2020-04-15 08:05 |  4.00 | C    | 2020-04-15 08:00 | 2020-04-15 08:06 | 2020-04-15 08:05:59.999 |
| 2020-04-15 08:07 |  2.00 | A    | 2020-04-15 08:00 | 2020-04-15 08:10 | 2020-04-15 08:09:59.999 |

转换为
new Bid(LocalDateTime.parse("2020-04-15 08:05",dateTimeFormatter), new BigDecimal("4.00"), "C"),
new Bid(LocalDateTime.parse("2020-04-15 08:07",dateTimeFormatter), new BigDecimal("2.00"), "A"),

使用的匹配正则表达式:
|s+(2020.*) |s+([.0-9]+)s+|s+([A-Z]).*

替换表达式:
new Bid(LocalDateTime.parse("1",dateTimeFormatter), new BigDecimal("2"), "3"),

欢迎分享,转载请注明来源:内存溢出

原文地址:https://www.54852.com/zaji/5605189.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-15
下一篇2022-12-15

发表评论

登录后才能评论

评论列表(0条)

    保存