背景

基于 Stream & Table relativity(参见前文),《Streaming Systems》将 declarative的编程方式往前推进到数据系统中最常用的SQL表达,即Streaming SQL。在《Streaming Systems》中,Streaming SQL 并不像 StreamCQL(基于Storm)属于 SQL-like,而是作为 Classic SQL 的扩展,兼容 Classic SQL 的所有规则。

目前而言,Streaming SQL 还是一个正在不断发展研究的的领域,还没有一个框架实现了《Streaming Systems》书中提到的所有扩展特性;在开源框架中,Apache Calcite 也只是实现了一部分(Apache Flink集成了Apache Calcite,Apache Spark 在2.2后也实现了部分特性)。

因此下面的讨论更多的是理论层面,而不是具体实现。

定义 Streaming SQL

关系型资料库,或者是 SQL-on-Hadoop 的 SQL 实现都算是 Classic SQL;它的数学基础是 relational algebra ,核心是 relation(a set of these tuples)。用传统资料库的辞汇表达就是一个 relation 就是一张 table、或者是资料库的物理单元 table、或者是 SQL 查询的 result、或者是 view 等等。relational algebra 的一个非常重要的性质就是 closure property:

applying any operator from the relational algebra to any valid relation always yields another relation.

许多尝试在 streaming 中引入 SQL 特性的系统都没有实现closure property(因此是 SQL-like );它们将 stream 和 SQL 视为不同的东西,并提供特殊的 operators 两者中转换。而这并不完美,使用者需要学习新的 operators,Streaming SQL 不应该这么复杂。

因此《Streaming Systems》将时间的概念或者说是 stream 的概念视为 relational algebra 的第一等公民,使 SQL 也能应用到 streaming 中。从这个角度看待 relation,Classic Relation 是 point-in-time relations ,而新的 relation 是 time-varying relations,即TVR。准确来讲,TVR 是evolution of a classic relation over time。

换句话说,Classic Relation 类似于二维的表结构,行代表著X轴,列代表著Y轴;time-varying relations在此基础之上引入了Z轴,即时间轴,变成了三维表结构。随著relation的变化,新的 relation snapshot 被不断加入Z轴。

基于此,《Streaming Systems》通过一系列精彩的论证,证明了基于 TVR 的 Streaming SQL 可以全盘继承 Classic SQL 的所有规则,这意味著:

classic sql的所有operators(例如Where,Having)都可以应用到 Streaming SQL 中,且结果一致。

closure property 依然成立。

一个伟大的突破!我们只需要将Classic SQL进行一个简单的扩展就能应用到 Streaming 系统中去,从而免去了大部分接触底层的编程。

再次审视Stream & Table Theory

《Streaming Systems》引入了STREAMTABLE两个关键字用以区分 Stream 和 Table ,并认为 Table 是the point-in time relation snapshot ,而 Stream 是 capture the sequence of changes。这有点类似于业务处理过程中的 OLTP Table,OLTP Table 可以认为是一系列INSERTs、UPDATEs和DELETEs的操作随著时间不断发生变化的 TVR。

就像Stream & Table Theory提到的 Stream 和 Table 是一枚硬币的两面,单纯从 Stream 或者是 Table 角度去看待这枚硬币,都是不全面的。使用Kafka做类比的话,如果我们将数据的每一个变化发送到 Kafka ,Stream 关注的是这些数据的变化,而 Table 则是数据的变化过程中形成的一个结果;其实这两者最终都是 Kafka 中记录的 Log。

因此 Beam Model 从 Stream 角度去看待数据,Classic SQL Model 从 Table 的角度去看待数据,都有失偏颇。Beam Model的核心数据抽象 PCollection 是 Stream -> Stream ,Table 被隐性的忽略了。而 Classic SQL Model 在处理单纯的 Table 是没有问题的,但是遇上类似于下面SQL的情况,就有些问题了。

SELECT team, score
INTO TeamAndScore
FROM UserScores;

SELECT team, SUM(score) as total
INTO TeamTotals
FROM TeamAndScore
GROUP BY team

此时 Classic SQL Model 必须将 TeamAndScore 转换成静态的 Table 进行处理,而不是使用 Stream 增量地处理数据。

因此我们需要对 SQL 进行简单地扩展,使其能良好地处理这两种情况,而不是割裂开来。

理想中的Streaming SQL

前文仅仅是论证了Streaming SQL的可行性,而没有谈到具体的扩展特性,《Streaming Systeams》花了很大的篇幅去描述理想中的 Streaming SQL 是什么样的。建议读者花时间去阅读相应章节,下文只是一些简单的概括和总结。

stream/table selection

可以使用STREAMTABLE关键字显性区分 Stream 和 Table:

SElECT STREAM * FROM X;

SElECT TABLE * FROM X;

且应该默认下面情况:

  • 如果所有的输入是 Table ,那么输出也是 Table 。
  • 如果所有的输入是 Stream ,那么输出的也是 Stream 。

Temporal Operators

Temporal Operators 是为了提供时间推理工具以能够处理事件时间,也就是回答Where、When、How三个问题。在SQL World,我们大可将事件时间作为表中的一列看待(这也是Spark 2.X的做法),同时在系统中引入 Sys.MTime 虚拟列作为数据的处理时间。

  • Where 问题对应的是 windowing,沿用 GROUP BY 语句即可。为了处理各种不同的window类型,可以像 Calcite 一样在 Group By 后面使用内置的函数 SESSION 之类的。
  • When 问题对应的是 trigger 和 watermark 。默认的SQL处理方式是使用类似 materialized views 一样的 per-record triggers,但是计算的过于频繁,性能会有问题。因此可以使用 Watermark triggers(类似于 EMIT <when> 语句,等待输入到达满足某种完整性就进行计算)、Repeated delay triggers(每隔一定时间段就进行计算)、Data-driven triggers(满足某个特别的数据要求时就进行计算),作为 trade-off 的一种选项。
  • How 问题对应的是 accumulation 。使用 Sys.Undo 和 Sys.Redo 虚拟列(类似于 Delete 和 Insert )来判定某一行数据是否删除或者是新插入。

结尾

Streaming SQL 相对于 Classic SQL,它将时间作为第一等公民进行对待,使得 Streaming 领域也可以使用 SQL 进行处理数据;并且对于 SQL 如何翻译成底层的运行模型,书中举了具体的例子和给出了相应的代码,为后续的开源框架指出了一种可能的实现方式。

PS:Beam模型和对应的Streaming SQL 实现确实很优秀;不过对于Apache Beam的发展,笔者并不看好,毕竟 Flink 和 Spark 在市场上已经占据了这么多份额,不可能甘心仅仅作为 Beam 的底层 runner,即使 Beam 有 Google 的背书。


参考文章:

  1. calcite.apache.org/docs
  2. 《Streaming Systems》第八章
  3. wso2.com/library/articl
  4. infoq.com/presentations

推荐阅读:

相关文章