Flink内核原理与实现
上QQ阅读APP看书,第一时间看更新

3.5 函数体系

函数在Flink中叫作Function,开发者编写的函数叫作UDF(User Defined Function),当然Flink对于通用场景也内置了大量的预定义的通用UDF来简化开发,如Join、GroupBy、Sum等SQL语义等价的UDF。UDF在Flink的DataStream开发和SQL开发中被广泛使用。开发者使用UDF主要是实现非通用的计算逻辑,一般是业务逻辑。在本书语境中,UDF、Function、用户自定义函数的含义是相同的。

按照输入和输入的不同特点分类,Flink中的UDF大概分为3类(见图3-23)。

1. SourceFunction

无上游Function,SourceFunction直接从外部数据存储读取数据,所以SourceFunction所在的算子是起始,没有上游算子。

图3-23 Function分类与关系

2. SinkFunction

无下游Function,SinkFunction直接将数据写入外部存储,所以Sink函数所在的算子是作业的重点,没有下游算子。

3. 一般Function

一般的UDF函数用在作业的中间处理步骤中,其接口定义与SourceFunction和SinkFunction不同。一般UDF所在的算子有上游算子,也有下游算子。

Flink的一般UDF有单流输入和双流输入两种,从UDF输入、输出的模型来说,多流输入可以通过多个双流输入串联而成,这种设计比较简单实用,如图3-24所示。

图3-24 多流输入转换为多层双流输入

SourceFunction和SinkFunction主要在Flink中的连接器使用,也会在自定义读取、写出数据的时候使用。其余的大量实现逻辑的函数都属于一般UDF。

3.5.1 函数层次

UDF在DataStream API层使用,Flink提供的函数体系从接口的层级来看,从高阶Function到低阶Function如图3-25所示。

图3-25 Function层次

Flink内置的DataStream上的API接口,如DataStream#map、DataStream#flatMap、DataStreamFilter#filter等,使用的都是高阶函数,开发者使用高阶函数的时候,无须关心定时器之类的底层概念,只需要关注业务逻辑即可。低阶函数即ProcessFunction。

无状态Function用来做无状态计算,使用比较简单,如MapFunction。无状态Function和RichFunction是一一对应的,如MapFunction对应RichMapFunction,如代码清单3-5所示。

代码清单3-5 MapFunction代码示例

从上边的代码可以看出来,使用MapFunction只需实现Map方法即可,所以无状态Function一般都是直接继承接口,如Map接口,或者通过匿名类实现接口。

RichFunction相比无状态Function,有两方面的增强:

1)增加了open和close方法来管理Function的生命周期,在作业启动时,Function在open方法中执行初始化,在Function停止时,在close方法中执行清理,释放占用的资源等。无状态Function不具备此能力。

2)增加了getRuntimeContext和setRuntimeContext。通过RuntimeContext,RichFunction能够获取到执行时作业级别的参数信息,而无状态Function不具备此能力。

无状态Function天然是容错的,作业失败之后,重新执行即可,但是有状态的Function(RichFunction)需要处理中间结果的保存和恢复,待有了状态的访问能力,也就意味着Function是可以容错的,执行过程中,状态会进行快照然后备份,在作业失败,Function能够从快照中恢复回来。

3.5.2 处理函数

处理函数(ProcessFunction)可以访问流应用程序所有(非循环)基本构建块:

1)事件(数据流元素)。

2)状态(容错和一致性)。

3)定时器(事件时间和处理时间)。

ProcessFunction根据场景不同有几种实现,如图3-26所示。

图3-26 ProcessFunction类体系

1)ProcessFunction:单流输入函数。

2)CoProcessFunction:双流输入函数。

3)KeyedProcessFunction:单流输入函数。

4)KeyedCoProcessFunction:双流输入函数。

Kyed ProcessFunction与Non-Keyed ProcessFunction的区别是,Keyed ProcessFunction只能用在KeyedStream上。

ProcessFunction和CoProcessFunction的区别是,CoProcessFunction是双流输入,而ProcessFunction是单流输入。

1.双流Join

下面是使用CoProcessFunction实现双流Join的例子。

(1)即时双流Join

其逻辑如下(见图3-27)。

1)创建1个State对象。

2)接收到输入流1事件后更新State。

3)接收到输入流2的事件后遍历State,根据Join条件进行匹配,将匹配后的结果发送到下游。

图3-27 双流即时Join

(2)延迟双流Join

在流式数据里,数据可能是乱序的,数据会延迟到达,并且为了提供处理效率,使用小批量计算模式,而不是每个事件触发一次Join计算,如图3-28所示。

图3-28 双流延迟Join

其逻辑如下。

1)创建2个State对象,分别缓存输入流1和输入流2的事件。

2)创建1个定时器,等待数据的到达,定时延迟触发Join计算。

3)接收到输入流1事件后更新State。

4)接收到输入流2事件后更新State。

5)定时器遍历State1和State2,根据Join条件进行匹配,将匹配后的结果发送到下游。

2.延迟计算

在上面的延迟Join示例中,使用了计时器来暂存一批数据之后再触发计算,在流计算中这是非常常见的场景。在前面提到的批流合一的关键概念中,关键是Watermark和Window,在Flink中的窗口计算(WindowOperator)就是典型的延迟计算,使用Window暂存数据,使用Watermark触发Window的计算,如图3-29所示。在Blink Table & SQL中也大量使用了定时器。

图3-29 延迟计算过程

触发器在算子层面上提供支持,所有支持延迟计算的算子都继承了Triggerable接口。Triggerable接口主要定义了基于事件时间和基于处理时间的两种触发行为,如代码清单3-6所示。

代码清单3-6 Triggerable接口

3.5.3 广播函数

前边介绍了单输入Function和双输入Function。在Flink 1.5.0版本中引入了广播状态模式,将一个数据流的内容广播到另一个流中,同时也引入了新的函数类型:广播函数。

广播函数的体系如图3-30所示。

在图3-30中可以看到,广播函数有BroadcastProcessFunction和KeyedBroadcastProcess Function,广播函数跟双流输入的处理函数类似,也有两个数据元素处理的接口,processElement()负责处理一般的数据流,processBroadcastElement()负责处理广播数据流。完整定义如代码清单3-7和代码清单3-8所示。

图3-30 广播函数体系

代码清单3-7 BroadcastProcessFunction抽象类

代码清单3-8 BroadcastProcessFunction类

上面的两个广播函数BroadcastProcessFunction和KeyedBroadcastProcessFunction都是抽象类,所以在实际使用中,开发者需要实现其定义的抽象方法。

processElement()方法和processBroadcastElement()方法的区别在于:processElement只能使用只读的上下文ReadOnlyContext,而processBroadcastElement()方法则可以使用支持读写的上下文Context。这么设计看起来很奇怪,但是合理的。广播状态模式下,要求所有算子上的广播状态应完全一致,如果也允许processElement方法更新、删除广播状态中的数据,那么会使得算子之间的广播状态变得不一致,导致系统行为不可预测。在后边会介绍数据分区,数据分区会将数据流进行分流,交给下游的不同算子,那么不同算子接收的数据流就是不同的,如果开发者在processElement方法中更新了广播状态,必然会导致广播状态变得不一致。也许会有人说,在算子更新广播状态的时候,通知其他算子不就可以了吗?但是Flink中的平行算子之间没有通信接口,所以此处的设计强制要求processElement()不能更新广播状态。

注意,只有设计的强制要求还不够,processBroadcastElement()必须确保行为的不可变性,即无论什么时间、在哪个物理机器、广播数据是否乱序,都必须保证执行结果完全相同。比较典型的破坏不可变性的例子包括处理逻辑依赖于当前时间,不同的节点当前时间并不完全一致,而且还要考虑到作业恢复执行的情况,因此跟恢复之前的当前时间更是不可能相同。

3.5.4 异步函数

在介绍异步算子的时候提到了异步函数(AsyncFunction),异步函数就是对Java异步编程框架的封装。

异步函数的类体系如图3-31所示。

图3-31 异步函数类体系

如图3-31所示,异步函数的抽象类RichAsyncFunction实现AsyncFunction接口,继承AbstractRichFunction获得了生命周期管理和FunctionContext的访问能力。

异步函数的接口中定义了两种行为,异步调用行为将调用结果封装到ResultFutrue中,同时提供了调用超时的处理,防止不释放资源,如代码清单3-9所示。

代码清单3-9 AsyncFunction接口

3.5.5 数据源函数

数据源函数在Flink中叫作SourceFunction,Flink是一个计算引擎,其需要从外部读取数据,所以在Flink中设计了SourceFunction体系,专门用来从外部存储读取数据。SourceFunction是Flink作业的起点,一个作业中可以有多个起点,即读取多个数据源的数据。

SourceFunction体系如图3-32所示。

图3-32 SourceFunction体系

SourceFunction接口本身只定义了接口的业务逻辑相关行为,在实际使用中,一般会继承抽象类RichSourceFunction或RichParallelSourceFunction。这两个抽象类通过继承AbstractRichFunction获得了Function的生命周期管理、访问RuntimeContext的能力。

从类的定义上来说,RichSourceFunction和RichParallelSourceFunction的代码完全相同,甚至代码中的注释都基本相同,但是为什么要设计这两个类呢?

其实这两个类的差异在运行层面上,RichSourceFunction是不可并行的,并行度限定为1,超过1则会报错。而RichParallelSourceFunction是可并行的,并行度可以根据需要设定,并没有限制。差异体现在StreamExecutionEnvironment#addSource方法中,其对Function的类型进行了判断,如果是ParallelSourceFunction类型,则是可并行的。如代码清单3-10所示。

代码清单3-10 构造DataStreamSource

SourceFunction有几个比较关键的行为。

1)生命周期管理:在实际中,一般SourceFunction的实现类会同时继承AbstractRichFunction,所以其生命周期包含open、close、cancle三种方法,在生命周期方法中可以包含相应的初始化、清理等。

2)读取数据: 持续地从外部存储读取数据,不同的外部存储有不同的实现,如从Kafka读取数据依赖于Kafka Producer等。

3)向下游发送数据。

4)发送Watermark:生成Watermark并向下游发送,Watermark的生成参见“时间与窗口”章节。

5)空闲标记:如果读取不到数据,则将该Task标记为空闲,向下游发送Status#Idle,阻止Watermark向下游传递。

SourceFunction接口定义如代码清单3-11所示,SourceFunction中内嵌了SourceContext接口。

代码清单3-11 SourceFunction接口

上边SourceFunction定义的数据发送、Watermark发送、空闲标记实际上都定义在SourceContext中。

StreamSourceContexts中提供了生成不同类型SourceContext的实例的方法,从总体上按照带不带时间分为两类SourceContext如图3-33所示。

1. NonTimestampContext

NonTimestampContext为所有的元素赋予-1作为时间戳,也就意味着永远不会向下游发送Watermark。

使用Processing Time时使用此Context,使用Processing Time的时候向下游发送Watermark没有意义,在实际处理中,各个计算节点会根据本地时间定义触发器,触发执行Window类计算,而不是根据Watermark来触发。

图3-33 SourceContext类体系

2. WatermarkContext

WatermarkContext定义了与Watermark相关的行为:

1)负责管理当前的StreamStatus,确保StreamStatus向下游传递。

2)负责空闲检测的逻辑,当超过设定的事件间隔而没有收到数据或者Watermark时,认为Task处于空闲状态。

WatermarkContext有两个实现类。

(1)AutomaticWatermarkContext

使用摄取时间(Ingestion Time)的时候,AutomaticWatermarkContext自动生成Watermark。在该Context中,启动WatermarkEmittingTask向下游发送Watermark,使用了一个定时器,其触发时间=(作业启动的时刻+Watermark周期×n),一旦启动之后,WatermarkEmittingTask会持续地自动注册定时器,向下游发送Watermark。

(2)ManualWatermarkContext

使用事件时间(Event Time)的时候,ManualWatermarkContext不会产生Watermark,而是向下游发送透传上游的Watermark。

3.5.6 输出函数

输出函数在Flink中叫作SinkFunction,负责将计算结果写入外部存储中,是作业终点,一个作业可以有多个Sink,即将数据写入不同的外部存储中。

SinkFunction类体系如图3-34所示。

SinkFunction只是单纯地定义了数据写出到外部存储的行为,并没有Function的生命周期管理行为,函数的生命周期定义在AbstractRichFunction中。在Connector中实际实现Sink的时候,基本都是从RickSinkFunction和TwoPhaseCommitSinkFunction继承。

TowPhaseCommitSinkFunction是Flink中实现端到端Exactly-Once的关键函数,提供框架级别的端到端Exactly-Once的支持,其在实现过程中与Flink检查点机制结合,在第13章有详细介绍。

图3-34 SinkFunction类体系

3.5.7 检查点函数

检查点函数就是在Flink中支持函数级别状态的保存和恢复的函数。为了实现函数级别的State管理,Flink中设计了CheckpointedFunction和ListCheckpointed接口。在检查点函数接口中主要设计了状态快照的备份和恢复两种行为。

CheckpointedFunction虽然已经标记为废弃,但仍然是现在用得最多的接口。当保存状态之后,其snapshotStat()会被调用,用于备份保存状态到外部存储。当恢复状态的时候,其initializeState()方法负责初始化State,执行从上一个检查点恢复状态的逻辑。

CheckpointedFunction接口定义如代码清单3-12所示。

代码清单3-12 CheckpointedFunction接口

ListCheckpointed接口的行为跟Checkpointed行为类似,除了提供状态管理能力之外,修改作业并行度的时候,还提供了状态重分布的支持。ListCheckpointed接口定义如代码清单3-13所示。

代码清单3-13 ListCheckpointed接口