Doris实时数仓实战
上QQ阅读APP看本书,新人免费读10天
设备和账号都新为新人

1.3.2 查询引擎

Doris的查询引擎是基于MPP框架的火山模型,是从早期的Apache Impala演化而来的。Doris会基于SQL语句先生成一个逻辑执行计划,然后根据数据的分布,形成一个物理执行计划。物理执行计划涉及多个Fragment,Fragment之间的数据传输则是由Exchange模块完成的。通过Exchange模块,Doris在执行查询任务时就有了数据重分布(Reshuffle)能力,让查询不再局限于数据存储节点,从而更好地利用多节点资源并行进行数据处理。基于MPP框架的查询引擎执行流程示意图如图1-12所示。

图1-12 基于MPP框架的查询引擎执行流程示意图

逻辑执行计划的Agg阶段分为先重分布数据再汇总两个步骤,这个过程和Hadoop类似,都是按照相同的Key进行数据重分布。

除了通过并行设计来提高查询效率外,Doris还对很多具体的查询算子进行了优化,比如图1-13中的聚合算子。

图1-13 聚合算子

在Doris中,聚合算子被拆分成两级聚合:第一级聚合是在数据所在节点,以减少第二级聚合的数据;而第二级聚合将Key相同的数据汇聚到同一个节点,进行最终的聚合。

在此基础上,Doris还实现了自适应聚合。首先我们要知道,聚合算子是一种阻塞型算子,需要等全部数据处理完后,才会将数据发送给上层节点。而自适应聚合是在第一级聚合中,如果发现聚合效果很差,即使聚合后也无法有效减少需要传输的数据,则会自动停止第一级聚合,转换为非阻塞的流式算子,直接将读取的数据发送到上层节点,从而避免不必要的阻塞等待。

针对Join算子,Doris也进行了大量优化,其中Runtime Filter是一种很重要的优化方式。在两个表的Join操作中,我们通常将右表称为BuildTable,将左表称为ProbeTable。在实现上,通常首先读取右表中的数据,在内存中构建一个HashTable,然后开始读取左表中的每一行数据,并在HashTable中进行连接匹配,返回符合连接条件的数据。通常来说,左表的数据量会大于右表的数据量。

Runtime Filter的设计思路是在右表内存中构建HashTable的同时,为连接列生成一个过滤器,之后把这个过滤器推给左表。这样,左表就可以利用过滤器对数据进行过滤,从而减少Probe节点需要传输和比对的数据。这种过滤器被称为Runtime Filter。针对不同的数据,Doris设计了不同类型的过滤器,例如In Predicate、Bloom Filter和Min Max。用户可以根据不同场景选择不同的过滤器。Runtime Filter实现逻辑示意图如图1-14所示。

Runtime Filter适用于大部分Join场景,包括节点的自动穿透,可将过滤器下推到最底层的扫描节点,例如分布式Shuffle Join中,可先将多个节点产生的过滤器进行合并,再下推到数据读取节点。

图1-14 Runtime Filter实现逻辑示意图