一、flink sql 背景
flink sql 是 flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 sql 语义的开发语言。
自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 flink 打造新一代计算引擎,针对 flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是我们熟知的 blink。blink 在原来的 flink 基础上最显著的一个贡献就是 flink sql 的实现。
flink sql 是面向用户的 api 层,在我们传统的流式计算领域,比如 storm、spark streaming 都会提供一些 function 或者 datastream api,用户通过 java 或 scala 写业务逻辑,这种方式虽然灵活,但有一些不足,比如具备一定门槛且调优较难,随着版本的不断更新,api 也出现了很多不兼容的地方。
在这个背景下,毫无疑问,sql 就成了我们最佳选择,之所以选择将 sql 作为核心 api,是因为其具有几个非常重要的特点:
sql 属于设定式语言,用户只要表达清楚需求即可,不需要了解具体做法;
sql 可优化,内置多种查询优化器,这些查询优化器可为 sql 翻译出最优执行计划;
sql 易于理解,不同行业和领域的人都懂,学习成本较低;
sql 非常稳定,在数据库 30 多年的历史中,sql 本身变化较少;
流与批的统一,flink 底层 runtime 本身就是一个流与批统一的引擎,而 sql 可以做到 api 层的流与批统一。
二、flink 的最新特性(1.7.0 和 1.8.0 更新)
2.1 flink 1.7.0 新特性
在 flink 1.7.0 中,我们更接近实现快速数据处理和以无缝方式为 flink 社区构建数据密集型应用程序的目标。最新版本包括一些新功能和改进,例如对 scala 2.12 的支持、一次性 s3 文件接收器、复杂事件处理与流 sql 的集成等。
apache flink 中对 scala 2.12 的支持(flink-7811)
apache flink 1.7.0 是第一个完全支持 scala 2.12 的版本。这允许用户使用较新的 scala 版本编写 flink 应用程序并利用 scala 2.12 生态系统。
状态演进(flink-9376)
许多情况下,由于需求的变化,长期运行的 flink 应用程序需要在其生命周期内发展。在不失去当前应用程序进度状态的情况下更改用户状态是应用程序发展的关键要求。使用 flink 1.7.0,社区添加了状态演变,允许您灵活地调整长时间运行的应用程序的用户状态模式,同时保持与以前保存点的兼容性。通过状态演变,可以在状态模式中添加或删除列,以便更改应用程序部署后应用程序捕获的业务功能。现在,使用 avro 生成时,状态模式演变现在可以立即使用作为用户状态的类,这意味着可以根据 avro 的规范来演变国家的架构。虽然 avro 类型是 flink 1.7 中唯一支持模式演变的内置类型,但社区仍在继续致力于在未来的 flink 版本中进一步扩展对其他类型的支持。
match recognize streaming sql 支持(flink-6935)
这是 apache flink 1.7.0 的一个重要补充,它为 flink sql 提供了 match recognize 标准的初始支持。此功能结合了复杂事件处理(cep)和 sql,可以轻松地对数据流进行模式匹配,从而实现一整套新的用例。此功能目前处于测试阶段,因此我们欢迎社区提供任何反馈和建议。
流式 sql 中的时态表和时间连接(flink-9712)
时态表是 apache flink 中的一个新概念,它为表的更改历史提供(参数化)视图,并在特定时间点返回表的内容。例如,我们可以使用具有历史货币汇率的表格。随着时间的推移,这种表格不断增长/发展,并且增加了新的更新汇率。时态表是一种视图,可以将这些汇率的实际状态返回到任何给定的时间点。使用这样的表,可以使用正确的汇率将不同货币的订单流转换为通用货币。时间联接允许使用不断变化/更新的表来进行内存和计算有效的流数据连接。
streaming sql 的其他功能
除了上面提到的主要功能外,flink 的 table&sql api 已经扩展到更多用例。以下内置函数被添加到 api:to_base64、log2、ltrim、repeat、replace、cosh、sinh、tanh sql client 现在支持在环境文件和 cli 会话中定义视图。此外,cli 中添加了基本的 sql 语句自动完成功能。社区添加了一个 elasticsearch 6 表接收器,允许存储动态表的更新结果。
kafka 2.0 连接器(flink-10598)
apache flink 1.7.0 继续添加更多连接器,使其更容易与更多外部系统进行交互。在此版本中,社区添加了 kafka 2.0 连接器,该连接器允许通过一次性保证读取和写入 kafka 2.0。
本地恢复(flink-9635)
apache flink 1.7.0 通过扩展 flink 的调度来完成本地恢复功能,以便在恢复时考虑以前的部署位置。如果启用了本地恢复,flink 将保留最新检查点的本地副本任务运行的机器。通过将任务调度到以前的位置,flink 将通过从本地磁盘读取检查点状态来最小化恢复状态的网络流量。此功能大大提高了恢复速度。
2.2 flink 1.8.0 新特性
flink 1.8.0 引入对状态的清理
使用 ttl(生存时间)连续增量清除旧的 key 状态 flink 1.8 引入了对 rocksdb 状态后端(flink-10471)和堆状态后端(flink-10473)的旧数据的连续清理。这意味着旧的数据将(根据 ttl 设置)不断被清理掉。
新增和删除一些 table api
1) 引入新的 csv 格式符(flink-9964)
此版本为符合 rfc4180 的 csv 文件引入了新的格式符。新描述符可以使用 org.apache.flink.table.descriptors.csv。目前,只能与 kafka 一起使用。旧描述符 org.apache.flink.table.descriptors.oldcsv 用于文件系统连接器。
2) 静态生成器方法在 tableenvironment(flink-11445)上的弃用
为了将 api 与实际实现分开,tableenvironment.gettableenvironment() 不推荐使用静态方法。现在推荐使用batch/streamtableenvironment.create()。
3) 表 api maven 模块中的更改(flink-11064)
之前具有 flink-table 依赖关系的用户需要更新其依赖关系 flink-table-planner,以及正确的依赖关系 flink-table-api-*,具体取决于是使用 java 还是 scala: flink-table-api-java-bridge 或者 flink-table-api-scala-bridge。
kafka connector 的修改
引入可直接访问 consumerrecord 的新kafkadeserializationschema(flink-8354),对于 flinkkafkaconsumers 推出了一个新的 kafkadeserializationschema,可以直接访问 kafkaconsumerrecord。
三、flink sql 的编程模型
flink 的编程模型基础构建模块是流(streams)与转换 (transformations),每一个数据流起始于一个或多个 source,并终止于一个或多个 sink。
相信大家对上面的图已经十分熟悉了,当然基于 flink sql 编写的 flink 程序也离不开读取原始数据,计算逻辑和写入计算结果数据三部分。
一个完整的 flink sql 编写的程序包括如下三部分:
source operator:soruce operator 是对外部数据源的抽象, 目前 apache flink 内置了很多常用的数据源实现例如 mysql、kafka 等;
transformation operators:算子操作主要完成例如查询、聚合操作等,目前 flink sql 支持了 union、join、projection、difference、intersection 及 window 等大多数传统数据库支持的操作;
sink operator:sink operator 是对外结果表的抽象,目前 apache flink 也内置了很多常用的结果表的抽象,比如 kafka sink 等
我们通过用一个最经典的 wordcount 程序作为入门,看一下传统的基于 dataset/datastream api 开发和基于 sql 开发有哪些不同?
datastream/datasetapi
public class wordcount { public static void main(string[] args) throws exception { final executionenvironment env = executionenvironment.getexecutionenvironment(); dataset text = env.fromelements( hello, flink, hello, blink ); dataset counts = text.flatmap(new linesplitter()) .groupby(0) .sum(1); counts.print(); } public static final class linesplitter implements flatmapfunction { @override public void flatmap(string value, collector out) { string[] tokens = value.tolowercase().split(\w+); for (string token : tokens) { if (token.length() > 0) { out.collect(new tuple2(token, 1)); } } } }}
flink sql
//省略掉初始化环境等公共代码select word, count(word) from table group by word;
我们已经可以直观体会到,sql 开发的快捷和便利性了。
四、flink sql 的语法和算子
4.1 flink sql 支持的语法
flink sql 核心算子的语义设计参考了 1992、2011 等 ansi-sql 标准,flink 使用 apache calcite 解析 sql ,calcite 支持标准的 ansi sql。
那么 flink 自身支持的 sql 语法有哪些呢?
insert: insert into tablereference queryquery: values | { select | selectwithoutfrom | query union [ all ] query | query except query | query intersect query } [ order by orderitem [, orderitem ]* ] [ limit { count | all } ] [ offset start { row | rows } ] [ fetch { first | next } [ count ] { row | rows } only]orderitem: expression [ asc | desc ]select: select [ all | distinct ] { * | projectitem [, projectitem ]* } from tableexpression [ where booleanexpression ] [ group by { groupitem [, groupitem ]* } ] [ having booleanexpression ] [ window windowname as windowspec [, windowname as windowspec ]* ]selectwithoutfrom: select [ all | distinct ] { * | projectitem [, projectitem ]* }projectitem: expression [ [ as ] columnalias ] | tablealias . *tableexpression: tablereference [, tablereference ]* | tableexpression [ natural ] [ left | right | full ] join tableexpression [ joincondition ]joincondition: on booleanexpression | using '(' column [, column ]* ')'tablereference: tableprimary [ [ as ] alias [ '(' columnalias [, columnalias ]* ')' ] ]tableprimary: [ table ] [ [ catalogname . ] schemaname . ] tablename | lateral table '(' functionname '(' expression [, expression ]* ')' ')' | unnest '(' expression ')'values: values expression [, expression ]*groupitem: expression | '(' ')' | '(' expression [, expression ]* ')' | cube '(' expression [, expression ]* ')' | rollup '(' expression [, expression ]* ')' | grouping sets '(' groupitem [, groupitem ]* ')'windowref: windowname | windowspecwindowspec: [ windowname ] '(' [ order by orderitem [, orderitem ]* ] [ partition by expression [, expression ]* ] [ range numericorintervalexpression {preceding} | rows numericexpression {preceding} ] ')'
上面 sql 的语法支持也已经表明了 flink sql 对算子的支持,接下来我们对 flink sql 中最常见的算子语义进行介绍。
4.2 flink sql 常用算子
select
select 用于从 dataset/datastream 中选择数据,用于筛选出某些列。
示例:
select * from table;// 取出表中的所有列select name,age from table;// 取出表中 name 和 age 两列
与此同时 select 语句中可以使用函数和别名,例如我们上面提到的 wordcount 中:
select word, count(word) from table group by word;
where
where 用于从数据集/流中过滤数据,与 select 一起使用,用于根据某些条件对关系做水平分割,即选择符合条件的记录。
示例:
select name,age from table where name like ‘% 小明 %’;select * from table where age = 20;
where 是从原数据中进行过滤,那么在 where 条件中,flink sql 同样支持 =、、、>=、value2
如果 value1 大于 value2,则返回 true ; 如果 value1 或 value2 为 null,则返回 unknown
value1 < value2
如果 value1 小于 value2,则返回 true ; 如果 value1 或 value2 为 null,则返回 unknown
value is null
如果 value 为 null,则返回 true
value is not null
如果 value 不为 null,则返回 true
string1 like string2
如果 string1 匹配模式 string2,则返回 true ; 如果 string1 或 string2为 null,则返回unknown
value1 in (value2, value3…)
如果给定列表中存在 value1 (value2,value3,…),则返回 true 。当(value2,value3,…)包含 null,如果可以找到该数据元则返回 true,否则返回 unknown。如果 value1 为 null,则始终返回 unknown
5.2 逻辑函数
逻辑函数
描述
a or b
如果 a 为 true 或 b 为 true,则返回 true
a and b
如果 a 和 b 都为 true,则返回 true
not boolean
如果 boolean 为 false,则返回 true,否则返回 true。如果 boolean 为 true,则返回 false
a is true 或 false
判断 a 是否为真
—
—
5.3 算术函数
算术函数
描述
numeric1 ±*/ numeric2
分别代表两个数值加减乘除
abs(numeric)
返回 numeric 的绝对值
power(numeric1, numeric2)
返回 numeric1 上升到 numeric2 的幂
除了上述表中的函数,flink sql 还支持种类丰富的函数计算。
5.4 字符串处理函数
字符串函数
描述
upper/lower
以大写 / 小写形式返回字符串
ltrim(string)
返回一个字符串,从去除左空格的字符串, 类似还有 rtrim
concat(string1, string2,…)
返回连接 string1,string2,…的字符串
5.5 时间函数
时间函数
描述
date string
返回以“yyyy-mm-dd”形式从字符串解析的 sql 日期
timestamp string
返回以字符串形式解析的 sql 时间戳,格式为“yyyy-mm-dd hh:mm:ss [.sss]”
current_date
返回 utc 时区中的当前 sql 日期
date_format(timestamp, string)
返回使用指定格式字符串格式化时间戳的字符串
六、flink sql 实战应用
上面我们分别介绍了 flink sql 的背景、新特性、编程模型和常用算子,这部分我们将模拟一个真实的案例为大家使用 flink sql 提供一个完整的 demo。
相信这里应该有很多 nba 的球迷,假设我们有一份数据记录了每个赛季的得分王的数据,包括赛季、球员、出场、首发、时间、助攻、抢断、盖帽、得分等。现在我们要统计获得得分王荣誉最多的三名球员。
原数据存在 score.csv 文件中,如下:
17-18,詹姆斯-哈登,72,72,35.4,8.8,1.8,0.7,30.416-17,拉塞尔-威斯布鲁克,81,81,34.6,10.4,1.6,0.4,31.615-16,斯蒂芬-库里,79,79,34.2,6.7,2.1,0.2,30.114-15,拉塞尔-威斯布鲁克,67,67,34.4,8.6,2.1,0.2,28.113-14,凯文-杜兰特,81,81,38.5,5.5,1.3,0.7,3212-13,卡梅罗-安东尼,67,67,37,2.6,0.8,0.5,28.711-12,凯文-杜兰特,66,66,38.6,3.5,1.3,1.2,2810-11,凯文-杜兰特,78,78,38.9,2.7,1.1,1,27.709-10,凯文-杜兰特,82,82,39.5,2.8,1.4,1,30.108-09,德维恩-韦德,79,79,38.6,7.5,2.2,1.3,30.207-08,勒布朗-詹姆斯,75,74,40.4,7.2,1.8,1.1,3006-07,科比-布莱恩特,77,77,40.8,5.4,1.4,0.5,31.605-06,科比-布莱恩特,80,80,41,4.5,1.8,0.4,35.404-05,阿伦-艾弗森,75,75,42.3,7.9,2.4,0.1,30.703-04,特雷西·麦克格雷迪,67,67,39.9,5.5,1.4,0.6,2802-03,特雷西·麦克格雷迪,75,74,39.4,5.5,1.7,0.8,32.101-02,阿伦-艾弗森,60,59,43.7,5.5,2.8,0.2,31.400-01,阿伦-艾弗森,71,71,42,4.6,2.5,0.3,31.199-00,沙奎尔-奥尼尔,79,79,40,3.8,0.5,3,29.798-99,阿伦-艾弗森,48,48,41.5,4.6,2.3,0.1,26.897-98,迈克尔-乔丹,82,82,38.8,3.5,1.7,0.5,28.796-97,迈克尔-乔丹,82,82,37.9,4.3,1.7,0.5,29.695-96,迈克尔-乔丹,82,82,37.7,4.3,2.2,0.5,30.494-95,沙奎尔-奥尼尔,79,79,37,2.7,0.9,2.4,29.393-94,大卫-罗宾逊,80,80,40.5,4.8,1.7,3.3,29.892-93,迈克尔-乔丹,78,78,39.3,5.5,2.8,0.8,32.691-92,迈克尔-乔丹,80,80,38.8,6.1,2.3,0.9,30.190-91,迈克尔-乔丹,82,82,37,5.5,2.7,1,31.589-90,迈克尔-乔丹,82,82,39,6.3,2.8,0.7,33.688-89,迈克尔-乔丹,81,81,40.2,8,2.9,0.8,32.587-88,迈克尔-乔丹,82,82,40.4,5.9,3.2,1.6,3586-87,迈克尔-乔丹,82,82,40,4.6,2.9,1.5,37.185-86,多米尼克-威尔金斯,78,78,39.1,2.6,1.8,0.6,30.384-85,伯纳德-金,55,55,37.5,3.7,1.3,0.3,32.983-84,阿德里安-丹特利,79,79,37.8,3.9,0.8,0.1,30.682-83,阿历克斯-英格利什,82,82,36.4,4.8,1.4,1.5,28.481-82,乔治-格文,79,79,35.7,2.4,1,0.6,32.3
首先我们需要创建一个工程,并且在 maven 中有如下依赖:
utf-8 1.7.1 1.7.7 1.2.17 2.11 org.apache.flink flink-core ${flink.version} org.apache.flink flink-java ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-table_2.11 1.7.1 org.apache.flink flink-streaming-scala_${scala.binary.version} 1.7.1 org.slf4j slf4j-log4j12 ${slf4j.version} log4j log4j ${log4j.version}
第一步,创建上下文环境:
executionenvironment env = executionenvironment.getexecutionenvironment(); batchtableenvironment tableenv = batchtableenvironment.gettableenvironment(env);
第二步,读取 score.csv 并且作为 source 输入:
dataset input = env.readtextfile(score.csv); dataset topinput = input.map(new mapfunction() { @override public playerdata map(string s) throws exception { string[] split = s.split(,); return new playerdata(string.valueof(split[0]), string.valueof(split[1]), string.valueof(split[2]), integer.valueof(split[3]), double.valueof(split[4]), double.valueof(split[5]), double.valueof(split[6]), double.valueof(split[7]), double.valueof(split[8]) ); } });其中的playerdata类为自定义类:public static class playerdata { /** * 赛季,球员,出场,首发,时间,助攻,抢断,盖帽,得分 */ public string season; public string player; public string play_num; public integer first_court; public double time; public double assists; public double steals; public double blocks; public double scores; public playerdata() { super(); } public playerdata(string season, string player, string play_num, integer first_court, double time, double assists, double steals, double blocks, double scores ) { this.season = season; this.player = player; this.play_num = play_num; this.first_court = first_court; this.time = time; this.assists = assists; this.steals = steals; this.blocks = blocks; this.scores = scores; } }
第三步,将 source 数据注册成表:
table topscore = tableenv.fromdataset(topinput);tableenv.registertable(score, topscore);
第四步,核心处理逻辑 sql 的编写:
table queryresult = tableenv.sqlquery(select player, count(season) as num from score group by player order by num desc limit 3);
第五步,输出结果:
dataset result = tableenv.todataset(queryresult, result.class);result.print();
我们直接运行整个程序,观察输出结果:
...16:28:06,162 info org.apache.flink.runtime.dispatcher.dispatcherrestendpoint - shut down complete.16:28:06,162 info org.apache.flink.runtime.taskexecutor.jobleaderservice - stop job leader service.16:28:06,164 info org.apache.flink.runtime.taskexecutor.taskexecutor - stopped taskexecutor akka://flink/user/taskmanager_2.16:28:06,166 info akka.remote.remoteactorrefprovider$remotingterminator - shutting down remote daemon.16:28:06,166 info akka.remote.remoteactorrefprovider$remotingterminator - remote daemon shut down; proceeding with flushing remote transports.16:28:06,169 info akka.remote.remoteactorrefprovider$remotingterminator - remoting shut down.16:28:06,177 info org.apache.flink.runtime.rpc.akka.akkarpcservice - stopping akka rpc service.16:28:06,187 info org.apache.flink.runtime.blob.permanentblobcache - shutting down blob cache16:28:06,187 info org.apache.flink.runtime.blob.transientblobcache - shutting down blob cache16:28:06,188 info org.apache.flink.runtime.blob.blobserver - stopped blob server at 0.0.0.0:5170316:28:06,188 info org.apache.flink.runtime.rpc.akka.akkarpcservice - stopped akka rpc service.迈克尔-乔丹:10凯文-杜兰特:4阿伦-艾弗森:4
我们看到控制台已经输出结果了:
完整的代码如下:
import org.apache.flink.api.common.functions.mapfunction;import org.apache.flink.api.java.dataset;import org.apache.flink.api.java.executionenvironment;import org.apache.flink.table.api.table;import org.apache.flink.table.api.java.batchtableenvironment;public class tablesql { public static void main(string[] args) throws exception{ //1. 获取上下文环境 table的环境 executionenvironment env = executionenvironment.getexecutionenvironment(); batchtableenvironment tableenv = batchtableenvironment.gettableenvironment(env); //2. 读取score.csv dataset input = env.readtextfile(score.csv); input.print(); dataset topinput = input.map(new mapfunction() { @override public playerdata map(string s) throws exception { string[] split = s.split(,); return new playerdata(string.valueof(split[0]), string.valueof(split[1]), string.valueof(split[2]), integer.valueof(split[3]), double.valueof(split[4]), double.valueof(split[5]), double.valueof(split[6]), double.valueof(split[7]), double.valueof(split[8]) ); } }); //3. 注册成内存表 table topscore = tableenv.fromdataset(topinput); tableenv.registertable(score, topscore); //4. 编写sql 然后提交执行 //select player, count(season) as num from score group by player order by num desc; table queryresult = tableenv.sqlquery(select player, count(season) as num from score group by player order by num desc limit 3); //5. 结果进行打印 dataset result = tableenv.todataset(queryresult, result.class); result.print(); } public static class playerdata { /** * 赛季,球员,出场,首发,时间,助攻,抢断,盖帽,得分 */ public string season; public string player; public string play_num; public integer first_court; public double time; public double assists; public double steals; public double blocks; public double scores; public playerdata() { super(); } public playerdata(string season, string player, string play_num, integer first_court, double time, double assists, double steals, double blocks, double scores ) { this.season = season; this.player = player; this.play_num = play_num; this.first_court = first_court; this.time = time; this.assists = assists; this.steals = steals; this.blocks = blocks; this.scores = scores; } } public static class result { public string player; public long num; public result() { super(); } public result(string player, long num) { this.player = player; this.num = num; } @override public string tostring() { return player + : + num; } }}//
当然我们也可以自定义一个 sink,将结果输出到一个文件中,例如:
tablesink sink = new csvtablesink(/home/result.csv, ,); string[] fieldnames = {name, num}; typeinformation[] fieldtypes = {types.string, types.int}; tableenv.registertablesink(result, fieldnames, fieldtypes, sink); sqlquery.insertinto(result); env.execute();
然后我们运行程序,可以看到 /home 目录下生成的 result.csv,查看结果:
迈克尔-乔丹,10凯文-杜兰特,4阿伦-艾弗森,4
七、总结
本篇向大家介绍了 flink sql 产生的背景,flink sql 大部分核心功能,并且分别介绍了 flink sql 的编程模型和常用算子及内置函数。最后以一个完整的示例展示了如何编写 flink sql 程序。flink sql 的简便易用极大地降低了 flink 编程的门槛,是我们必需掌握的使用 flink 解决流式计算问题最锋利的武器!
怎样用计算机电源为钻头供电
荣耀Magic2评测 到底值不值得买
二手CHROMA19052 安规测试仪(曾S13713875
云计算为产业发展提供了巨大潜力开辟新窗口
入门PIC需要准备什么工具呢?
一套符合标准SQL语义的开发语言,link的最新特性
新思科技部署下一代 DesignWare IP 解决方案
2021哪款蓝牙耳机性价比高?性价比超高的无线蓝牙耳机推荐
5G网络商用之前还面临着三个主要挑战
蓝牙音乐芯片在婴儿用品/摇篮床上的应用
智能镜面显示屏让智能家居成为当今时代新的时尚
pcb自动布线设置_设置线间距与宽度设置_pcb布局布线技巧
常用单片机介绍
氮化镓助力快充小型化,聚合物钽电容大显身手
投身AI创业浪潮 成功的概率只有1%
语音控件引入了火狐Firefox浏览器
SK将在2018年移动世界大会的展台上推出“社交虚拟现实”和多种5G演示
嵌智捷科技CM-AM335X嵌入式主板介绍
特斯拉推送固件更新,专为中国车主设计的功能
如何测量石英晶体的好坏