flink1.14 sql基础语法(一) flink sql表查询详解

flink sql表查询详解

1、高阶聚合

group by cube(维度1, 维度2, 维度3)group by grouping sets( (维度1,维度2),(维度1,维度3),() )group by rollup(,,)

语法示例:

select privince,city,region,count(distinct uid) as u_cnt
from t 
group by cube(province,city,region)select privince,city,region,count(distinct uid) as u_cnt
from t 
group by rollup(province,city,region)select privince,city,region,count(distinct uid) as u_cnt
from t 
group by grouping sets( (province,city), (province,city,region) )

2、时间窗口 TVF(表值函数)

flink从1.13开始,提供了时间窗口聚合计算的TVF语法。

表值函数的使用约束:

  • (1)在窗口上做分组聚合,必须带上window_start 和 window_end 作为分组的key;
  • (2)在窗口上做topn计算,必须带上window_start 和 window_end 作为partition的key;
  • (3)带条件的join,必须包含2个输入表的window_start 和 window_end 等值条件。
select ......
from table(tumble (table t ,descriptor(rt),interval '10' minutes) 
)

(1) 支持的时间窗口类型

1、滚动窗口(Tumble Windows)

TUMBLE (TABLE t_action,descriptor(时间属性字段),INTERVAL '10' SECOND[ 窗口长度 ] )

2、滑动窗口(Hop Windows)

HOP (TABLE t_action,descriptor(时间属性字段),INTERVAL '5' SECONDS[ 滑动步长 ] , INTERVAL '10' SECOND[ 窗口长度 ] )

3、累计窗口(Cumulate Windows)

CUMULATE (TABLE t_action,descriptor(时间属性字段),INTERVAL '5' SECONDS[ 更新最大步长 ] , INTERVAL '10' SECOND[ 窗口最大长度 ] )

4、会话窗口(Session Windows)
暂不支持!

(2) 语法示例

select window_start,window_end,channel,count(distinct guid) as uv
from table (tumble(table t_applog,descriptor(rt),interval '5' minute ) --滚动窗口 
)
group by window_start,window_end,channel

3、窗口topn

-- bidtime,price,item,supplier_id
2020-04-15 08:05:00.000,4.00,C,supplier1
2020-04-15 08:07:00.000,2.00,A,supplier1
2020-04-15 08:09:00.000,5.00,D,supplier2
2020-04-15 08:11:00.000,3.00,B,supplier2
2020-04-15 08:09:00.000,5.00,D,supplier3
2020-04-15 08:11:00.000,6.00,B,supplier3
2020-04-15 08:11:00.000,6.00,B,supplier3
/*** 10分钟滚动窗口中的交易金额最大的前2笔订单*/
public class _02_Window_Topn_V2 {public static void main(String[] args)  {// 创建表的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 从kafka中读取数据String sourceTable = "CREATE TABLE source_table (\n" +"  bidtime string ,\n" +"  `price` double,\n" +"  `item` STRING,\n" +"  `supplier_id` STRING,\n" +"  `rt` as cast( bidtime as timestamp(3) ),\n" +"   watermark for rt as rt - interval '5' second\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topn1',\n" +"  'properties.bootstrap.servers' = 'hadoop01:9092',\n" +"  'properties.group.id' = 'testGroup',\n" +"  'scan.startup.mode' = 'earliest-offset',\n" +"  'format' = 'csv'\n" +")";tenv.executeSql(sourceTable);// 10分钟滚动窗口中的交易金额最大的前2笔订单tenv.executeSql("select\n" +"  *\n" +"from(\n" +"  select window_start,window_end, \n" +"    bidtime,\n" +"    price,\n" +"    item,\n" +"    supplier_id,\n" +"    row_number() over(partition by window_start,window_end order by price desc ) as rn\n" +"  from table(\n" +"    tumble(table source_table,descriptor(rt),interval '10' minute)\n" +"  ) \n" +") t1 where rn <= 2 ").print();}
}## 结果如下
+----+-------------------------+-------------------------+-------------------------+-------+---------+--------------+-------+
| op |            window_start |              window_end |                 bidtime | price |    item |  supplier_id |    rn |
+----+-------------------------+-------------------------+-------------------------+-------+---------+--------------+-------+
| +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 | 2020-04-15 08:09:00.000 |   5.0 |       D |    supplier3 |     1 |
| +I | 2020-04-15 08:00:00.000 | 2020-04-15 08:10:00.000 | 2020-04-15 08:09:00.000 |   5.0 |       D |    supplier2 |     2 |
/**** 10分钟滚动窗口内交易总额最高的前两家供应商,及其交易总额和交易单数*/
public class _02_Window_Topn_V3 {public static void main(String[] args)  {// 创建表的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 从kafka中读取数据String sourceTable = "CREATE TABLE source_table (\n" +"  bidtime string ,\n" +"  `price` double,\n" +"  `item` STRING,\n" +"  `supplier_id` STRING,\n" +"  `rt` as cast( bidtime as timestamp(3) ),\n" +"   watermark for rt as rt - interval '5' second\n" +") WITH (\n" +"  'connector' = 'kafka',\n" +"  'topic' = 'topn1',\n" +"  'properties.bootstrap.servers' = 'hadoop01:9092',\n" +"  'properties.group.id' = 'testGroup',\n" +"  'scan.startup.mode' = 'earliest-offset',\n" +"  'format' = 'csv'\n" +")";tenv.executeSql(sourceTable);// 10分钟滚动窗口内交易总额最高的前两家供应商,及其交易总额和交易单数String executeSql = "select\n" +"  *\n" +"from(\n" +"  select\n" +"   window_start,\n" +"   window_end,\n" +"   supplier_id,\n" +"   sum_price,\n" +"   cnt,\n" +"   row_number() over(partition by window_start,window_end order by sum_price desc ) as rn \n" +"   from(\n" +"      select\n" +"        window_start,\n" +"        window_end,\n" +"        supplier_id,\n" +"        sum(price) as sum_price,\n" +"        count(1) as cnt\n" +"      from table(\n" +"        tumble(table source_table,descriptor(rt),interval '10' minute)\n" +"      ) group by window_start,window_end,supplier_id\n" +"  ) t1\n" +") t1 where rn <= 2";tenv.executeSql(executeSql).print();}
}

4、window join查询

语法:

  • 在TVF上使用join
  • 参与join 的两个表都需要定义时间窗口
  • join 的条件中必须包含两表的window_start和 window_end的等值条件

支持join的方式:

  • inner/left/right/full
  • semi(where id in …)
  • anti(where id not in …)

代码示例:

/*** 各种窗口的join代码示例*/
public class _03_Join {public static void main(String[] args) {// 创建表的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);/*** 1,a,1000* 2,b,2000* 3,c,2500* 4,d,3000* 5,e,12000*/// 从socket流中读取数据DataStreamSource<String> s1 = env.socketTextStream("hadoop01", 9999);SingleOutputStreamOperator<Tuple3<String, String, Long>> ss1 = s1.map(new MapFunction<String, Tuple3<String, String, Long>>() {@Overridepublic Tuple3<String, String, Long> map(String line) throws Exception {String[] arr = line.split(",");return Tuple3.of(arr[0], arr[1], Long.parseLong(arr[2]));}});/*** 1,bj,1000* 2,sh,2000* 4,xa,2600* 5,yn,12000*/DataStreamSource<String> s2 = env.socketTextStream("hadoop01", 9998);SingleOutputStreamOperator<Tuple3<String, String, Long>> ss2 = s2.map(new MapFunction<String, Tuple3<String, String, Long>>() {@Overridepublic Tuple3<String, String, Long> map(String line) throws Exception {String[] arr = line.split(",");return Tuple3.of(arr[0], arr[1], Long.parseLong(arr[2]));}});// 创建两个表tenv.createTemporaryView("t_left",ss1,Schema.newBuilder().column("f0", DataTypes.STRING()).column("f1", DataTypes.STRING()).column("f2", DataTypes.BIGINT()).columnByExpression("rt"," to_timestamp_ltz(f2,3) ").watermark("rt","rt - interval '0' second ").build());tenv.createTemporaryView("t_right",ss2,Schema.newBuilder().column("f0", DataTypes.STRING()).column("f1", DataTypes.STRING()).column("f2", DataTypes.BIGINT()).columnByExpression("rt"," to_timestamp_ltz(f2,3) ") // 指定事件时间.watermark("rt","rt - interval '0' second ") // 指定水位线.build());// 各种窗口join的示例// INNER JOINString innerJoinSql = "select\n" +"  a.f0,\n" +"  a.f1,\n" +"  a.f2,\n" +"  b.f0,\n" +"  b.f1\n" +"from\n" +"( select * from table( tumble(table t_left,descriptor(rt), interval '10' second) )  ) a\n" +"join\n" +"( select * from table( tumble(table t_right,descriptor(rt), interval '10' second) )  ) b\n" +// 带条件的join,必须包含2个输入表的window_start 和 window_end 等值条件"on a.window_start = b.window_start and a.window_end = b.window_end and a.f0 = b.f0";//        tenv.executeSql(innerJoinSql).print();// left / right / full outerString fullJoinSql = "select\n" +"  a.f0,\n" +"  a.f1,\n" +"  a.f2,\n" +"  b.f0,\n" +"  b.f1\n" +"from\n" +"( select * from table( tumble(table t_left,descriptor(rt), interval '10' second) )  ) a\n" +"full join\n" +"( select * from table( tumble(table t_right,descriptor(rt), interval '10' second) )  ) b\n" +// 带条件的join,必须包含2个输入表的window_start 和 window_end 等值条件"on a.window_start = b.window_start and a.window_end = b.window_end and a.f0 = b.f0";//        tenv.executeSql(fullJoinSql).print();// semi ==> where ... in ...String semiJoinSql = "select\n" +"  a.f0,\n" +"  a.f1,\n" +"  a.f2,\n" +"from\n" +"-- 1、在TVF上使用join\n" +"-- 2、参与join 的两个表都需要定义时间窗口\n" +"( select * from table( tumble(table t_left,decriptor(rt), interval '10' second) ) ) a\n" +"where f0 in\n" +"(\n" +"  select\n" +"    f0\n" +"  from\n" +"  ( select * from table( tumble(table t_right,decriptor(rt), interval '10' second) ) ) b\n" +"  -- 3、join 的条件中必须包含两表的window_start和 window_end的等值条件\n" +"  where a.window_start = b.window_start and a.window_end = b.window_end\n" +")";//        tenv.executeSql(semiJoinSql).print();}
}

5、flink sql中的join分类

在这里插入图片描述

(1)regular join

常规join,flink底层是会对两个参与join的输入流中的数据进行状态存储的;所以,随着时间的推进,状态中的数据量会持续膨胀,可能会导致过于庞大,从而降低系统的整体效率;
可以如何去缓解:

自己根据自己业务系统数据特性(估算能产生关联的左表数据和右表数据到达的最大时间差),根据这个最大时间差,去设置ttl 时长;

StreamTableEnvironment tenv = StreamTableEnvironmentcreate(env);// 设置table环境中的状态tt时长
tenv.getConfig().getConfiguration().setLong("table.exec.state.ttl",60*60*1000L);

(2)Lookup join(维表join)

Lookup join跟其它的join有较大的不同,在 flinksql 中,所有的 source connector都实现自DynamicTableSource。

在这里插入图片描述

  • ScanTableSource是用的最多的常规TableSource,它会持续、完整读取源表,形成flink中的核心数据抽象—“数据流";

  • LookupTableSource,则并不对源表持续、完整读取,而是在需要的时候,才根据一个(或多个)查询key,去临时性地查询源表得到一条(或多条)数据;

lookup join为了提高性能,lookup的连接器会将查询过的维表数据进行缓存(默认未开启此机制),可以通过参数开启,比如 jdbc-connector 的 lookup模式下,有如下参数:

  • lookup.cache.max-rows= (none) 未开启
  • lookup.cache.ttl = (none) ttl缓存清除的时长
public class JdbcDynamicTableSource implements ScanTableSource,LookupTableSource, SupportsProjectionPushDown, SupportsLimitPushDown {

它实现了上述两种接口,因而它是两种读取模式的混合封装体因而,它也实现了上述两个接口中各自的一个重要方法:

  • getLookupRuntimeProvider
  • getScanRuntimeProvider

对于lookupRuntimeProvider 来说,最重要的是其中的: JdbcRowDataLookupFunction

// lookup Function中最重要的就是eval方法    
public void eval(Object... keys) {RowData keyRow = GenericRowData.of(keys);if (this.cache != null) {List<RowData> cachedRows = (List)this.cache.getIfPresent(keyRow);// 对于传入的keys,先从缓存中获取要查询的数据if (cachedRows != null) {Iterator var24 = cachedRows.iterator();while(var24.hasNext()) {RowData cachedRow = (RowData)var24.next();// 如果缓存中拿到了数据,就直接输出this.collect(cachedRow);}return;}}int retry = 0;// 否则,用jdbc去进行查询while(retry <= this.maxRetryTimes) {try {// 构建jdbc查询语句statementthis.statement.clearParameters();this.statement = this.lookupKeyRowConverter.toExternal(keyRow, this.statement);// 执行查询语句,并获取resultSetResultSet resultSet = this.statement.executeQuery();Throwable var5 = null;try {if (this.cache == null) {while(resultSet.next()) {this.collect(this.jdbcRowConverter.toInternal(resultSet));}return;}ArrayList rows = new ArrayList();// 迭代resultSetwhile(resultSet.next()) {// 转成内部数据类型RowDataRowData row = this.jdbcRowConverter.toInternal(resultSet);// 将数据装入到一个list后一次性输出rows.add(row);this.collect(row);}// 将查询到的数据,放入到缓存中rows.trimToSize();this.cache.put(keyRow, rows);break;} catch (Throwable var20) {var5 = var20;throw var20;} finally {...}} catch (SQLException var22) {...}}}

look up join的实例:

public class _04_LookUpJoin {public static void main(String[] args) throws Exception {// 创建flink sql的执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);StreamTableEnvironment tenv = StreamTableEnvironment.create(env);// 设置table环境中的状态时长ttltenv.getConfig().getConfiguration().setLong("table.exec.state.ttl",60 * 60 * 1000L);/*** 1,a* 2,b* 3,c* 4,d* 5,e*/SingleOutputStreamOperator<Tuple2<Integer, String>> ss1 = env.socketTextStream("hadoop01", 9999).map(new MapFunction<String, Tuple2<Integer, String>>() {@Overridepublic Tuple2<Integer, String> map(String line) throws Exception {String[] arr = line.split(",");return Tuple2.of(Integer.parseInt(arr[0]), arr[1]);}});// 创建主表(需要声明处理时间属性字段)tenv.createTemporaryView("a",ss1, Schema.newBuilder().column("f0", DataTypes.INT()).column("f1", DataTypes.STRING()).columnByExpression("pt","proctime()") // 定义处理时间属性字段.build());// 创建lookup 维表(jdbc connector表)String lookUpSql = "-- register a MySQL table 'users' in Flink SQL\n" +"CREATE TABLE b (\n" +"  id int,\n" +"  name STRING,\n" +"  gender STRING,\n" +"  PRIMARY KEY (id) NOT ENFORCED\n" +") WITH (\n" +"   'connector' = 'jdbc',\n" +"   'url' = 'jdbc:mysql://localhost:3306/ycak',\n" +"   'table-name' = 'users',\n" +"   'username' = 'root',\n" +"   'password' = 'zsd123456'\n" +")";tenv.executeSql(lookUpSql);// lookup join查询String lookupSelectSql = "select\n" +"  a.*,\n" +"  c.*\n" +"from \n" +"  a join b FOR SYSTEM_TIME AS OF a.pt as c\n" +"on a.f0 = c.id";tenv.executeSql(lookupSelectSql).print();env.execute();}
}

(3)Interval join

nterval Join:流与流的 Join,两条流一段时间区间内的 Join。Interval Join 可以让一条流去 Join 另一条流中前后一段时间内的数据

Regular Join 会产生回撤流,但是在实时数仓中一般写入的 sink 都是类似于 Kafka 这样的消息队列,然后后面接 clickhouse 等引擎,这些引擎又不具备处理回撤流的能力。 Interval Join 就是用于消灭回撤流的。

实际案例:曝光日志关联点击日志筛选既有曝光又有点击的数据,条件是曝光之后发生 4 小时之内的点击,并且补充点击的扩展参数(show inner interval click)

INSERT INTO sink_table
SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params
FROM show_log_table 
INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '4' HOUR AND click_log_table.row_time;

时间区间条件的可用语法:

  • l_time = r_time
  • l_time >= r_time AND l_time < + INTERVAL '10' MINUTE
  • l_time BETWEEN r_time - INTERVAL '10' SECOND AND r_time + INTERVAL '5' SECOND

(4)temporal join(时态join/版本join)

左表的数据永远去关联右表数据的对应时间上的最新版本

-- 有如下交易订单表(订单id,金额,货币,时间)
1,88,e,1000
2,88,e,2000
3,68,e,3000-- 有如下汇率表(货币,汇率,更新时间)
e,1.0,1000
e,2.0,3000-- temporal join的结果如下
1,88,e,1000,1.0
2,88,e,2000,1.0
3,68,e,3000,2.0
-- 创建表orders
-- append-only表
create table orders(order_id STRING,price decimal(32,2),currency STRING,order_time TIMESTAMP(3),watermark for order_time as  order_time
)with (/*...*/)-- 创建汇率表,比如从cdc过来的表
create table currency_rates(currency STRING,conversion_rate decimal(32,2),update_time TIMESTAMP(3) METADATA from 'values.source.timestamp' VIRTUAL,watermark for update_time as  update_time,PRIMARY KEY (currency) NOT ENFORCED
)with (
'connector' = 'kafka',
'value.format' = 'debezium-json',
/*...*/
)SELECTorder_id,price,currency,conversion_rate,order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIMEAS OF orders.order_time 
ON orders.currency = currency_rates.currency;

(5)窗口聚合

row_number() over ()

flinksql中,over聚合时,指定聚合数据区间有两种方式
方式1,带时间设定区间
RANGE BETWEEN INTERVAL '30'MINUTE PRECEDING AND CURRENT ROW
方式2,按行设定区间
ROWS BETWEEN 10 PRECEDING AND CURRENT ROW

SELECT order_id, order_time, amount,SUM(amount) OVER( PARTITION BY productORDER BY order_time RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW ) AS one_hour_prod_amount_sum
FROM Orders

over window可以单独定义并重复使用,从而简化代码


SELECT order_id, order_time, amount,SUM(amount) OVER w AS sum_amount,AVG(amount) OVER w AS avg_amount FROM OrdersWINDOW w AS (
PARTITION BY product ORDER BY order_time RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW )


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部