5.2 SQL类型系统
在Flink 1.9版本之前,Flink SQL使用TypeInfomation类型系统,从1.9版本开始引入了Blink,同时引入了新的SQL类型系统,解决了DataStream类型系统在SQL中使用的问题。
SQL类型系统是一种类型的逻辑表示,跟Java/Scala的物理类型并不是一一对应的关系。在DataStream的类型系统中,使用TypeInformation来描述类型信息,而在Flink SQL中则使用DataType中的LogicalType类型系统来描述类型信息,LogicalType类型系统与SQL标准基本保持一致,同时增加了一些额外的信息,如是否可以为null等,目的是提高scala expression(标量表达式)的处理效率。
Flink SQL中支持的SQL逻辑类型如图5-4所示。
LogicalType是SQL类型系统的核心抽象,所有SQL类型都继承自LogicalType。LogicalType子类定义了每种类型的特殊参数,如对于数值类型,定义了其长度和精度。
注意:LogicalType类型只是用来描述数据类型的,当前planner和runtime实现尚未支持所有的LogicalType类型。
SQL类型系统只是逻辑类型,但是在Flink SQL执行时,最终转换为了Flink DataStream/DataSet应用,此时就需要TypeInfomation类型信息来实现序列化/反序列化,所以SQL逻辑类型LogicalType需要转换为TypeInfomation。
Row是SQL类型系统中顶层的数据容器,表示表中的一行数据或者数据流上的一个数据记录,在目前版本的Flink中存在两套Row结构:
图5-4 Flink SQL支持的SQL逻辑类型
1)org.apache.flink.types.Row:在Flink Planner中使用,是1.9版本之前Flink SQL使用的Row结构,在SQL相关的算子、UDF函数、代码生成中都是使用该套Row结构。
2)org.apache.flink.table.dataformat.BaseRow及其子类:是在Blink Runtime和Blink Planner中使用的新的Row类型数据结构,在Blink算子、UDF函数和代码生成中使用此结构。
5.2.1 Flink Row
在1.9之前版本的Flink实现中,使用Row作为Flink SQL中一行记录的表达和存储形式,Row使用了对象数组(Object[])来记录数据,本质上来说是基于Java对象的方式。在数据计算过程中,需要消耗大量的CPU来序列化/反序列化Row对象。
假设对于一个(Integer,String,String)结构的表,其中一行记录为(321,“awesome”,“flink”),使用Fink Row存储,其结构如图5-5所示。
图5-5 Flink Row存储结构
Flink Row本身不是强类型的,所以需要为Row提供RowTypeInfo来描述Row中的数据类型,在序列化/反序列化的时候使用。
5.2.2 Blink Row
在1.9版本中Blink引入二进制的内存行式存储和列式存储两种数据组织形式,两种存储结构都比较紧凑。
内存中的行、列式存储和磁盘上的行、列式存储不同。在磁盘上的列式存储一般会采用压缩存储,比行式存储压缩效率高,占用存储空间小、IO效率高,所以非常适用于分析型的计算。在内存中的列式存储一般没有必要采用压缩存储,毕竟解压缩也需要消耗计算成本。所以在内存中行、列式存储各有优势,考虑到计算类型的不同,并没有绝对的优劣之分。Blink Row总览如图5-6所示。
图5-6 Blink Row总览
1. Blink中的行式存储结构
1)BinaryRow:表数据的二进制行式存储,分为定长部分和不定长部分,定长部分只能在一个MemorySegment内。
2)NestedRow:与BinaryRow的内存存储结构一样,区别在于NestedRow的定长部分可以跨MemorySegment。
3)UpdatableRow:该类型的Row比较特别,其保存了该行所有字段的数据,更新字段数据的时候不修改原始数据,而是使用一个数组记录被修改字段的最新值。读取数据的时候,首先判断数据是否被更新过,如果更新过则读取最新值,如果没有则读取原始值。
4)ObjectArrayRow:使用对象数据保存数据,比二进制结构存储形式多了对象的序列化/反序列化,理论上来说成本更高。其有两个实现类GenericRow和BoxedWrapperRow。GenericRow中存储的数据类型是原始类型(如int等),BoxedWrapperRow中存储的数据类型是可序列化和可比较大小的对象类型。
5)JoinedRow:表示Join或者关联运算中的两行数据的逻辑结构,如Row1、Row2,两行数据并没有进行物理上的合并,物理合并成本高。但是从使用者的角度来说,看起来就是一行数据,无须关注底层。
为了提升Flink SQL的性能,在1.9版本中实现了BinaryRow,BinaryRow直接使用MemorySegment来存储和计算,计算过程中直接对二进制数据结构进行操作,避免了序列化/反序列化的开销。
在此重点介绍一下BinaryRow,假设对于一个(Integer,String,String)结构的表,其中一行记录为(321,“awesome”,“flink”),使用BinaryRow存储,其结构如图5-7所示。
图5-7 BinaryRow的结构
BinaryRow存储结构中包含两个部分: 定长部分和变长部分。
(1)定长部分
定长部分包含3个内容:头信息区(Header)、空值索引(Null Bit Set)、字段值区(Field Values)。
1)头信息区(Header):占用一个字节。
2)空值索引(Null Bit Set):用于标记行中Null值字段,在内存中使用8字节进行对齐。在实际的存储中,该区域的第1个字节就是行的头信息区,剩下的才是Null值字段标识位。
3)字段值区(Field Values):保存基本类型和8个字节长度以内的值,如果某个字段值超过了8个字节,则保存该字段的长度与offset偏移量。在目前的实现中,一般的Bool类型、数值类型和长度较短的时间类型、精度低一些的Decimal类型可以保存在定长部分,如代码清单5-4所示。
代码清单5-4 数据类型是否为定长判断
在目前的设计中,定长部分全部保存在1个MemorySegment中,以提升读写BinaryRow中字段的速度。在写入阶段,如果BinaryRow中定长部分超过单个MemorySegment的存储容量,确实有非常多的字段,则建议增加MemorySegment的大小。
(2)变长部分
变长部分用来保存超过8个字节长度的字段的值,可能会保存跨越多个MemorySegment的字段。
注意:BinaryRow实际上是参照Spark的UnsafeRow来设计的,两者的区别在于Flink的BinaryRow不是保存在连续内存中,如果不定长部分足够小,可以保存在一个固定长度的内存中。
按照阿里巴巴官方的测试,使用了BinaryRow之后,Blink在流计算方面的性能提升了2倍。
2. Blink中的内存列式存储
内存中的列式存储形式,目前在Flink中用来读取ORC类型的列式存储的数据。
ColumnarRow:表数据的二进制列式存储,每一列是一个Vector向量。
5.2.3 ColumnarRow
ColumnarRow是一种内存列式存储结构,每一列的抽象结构为ColumnVector。在当前的实现中,只支持堆上ColumnVector,堆外的ColumnVector尚不被支持。堆上ColumnVector本质上是使用Java原始类型数据保存一列的数据。Orc类型的列式存储使用了ColumnarRow。
对于查询类的请求,使用列式存储能够提高CPU缓存命中率。CPU的数据预读取策略总是尝试将相邻的数据预读取到缓存中,因为列式存储形式中一列数据总是紧邻的,与行式数据相比,访问同一个字段的时候,CPU缓存命中率更高,因此CPU就无须浪费宝贵的事件周期去等待数据从内存加载,从而提高计算效率,如图5-8所示。
图5-8 ColumnarRow组织数据的结构示意图
默认情况下,一个ColumnarRow示例中保存2048行的数据,2048是个经验数字。