Enzyme SQL 引擎的实现与优化

2019-01-05 01:00:21 +08:00
 sadhen

开了掘金的专栏: https://juejin.im/post/5c2dd18af265da61285a3e08

因为掘金的 Markdown 编辑器还是很好用的。后面团队里面别的小伙伴和老司机们的文章也会在上面陆续发布。欢迎关注。知乎的专栏得公司申请认证,还在走流程。

《 Scala 实用指南》快要第二次印刷了,勘误还是太少,希望 V 友们如果买了,多多反馈。另外,我在知乎的 Scala 专栏已经写了很久了,作为图书的补充,欢迎大家阅读。Enzyme SQL 就是我翻译完《 Scala 实用指南》用 Scala 编写的,在我的序(序和前面的章节在异步社区和微信阅读都是免费的)里面,大家可以看到相关信息。


Enzyme 是挖财数据团队自研的 SQL 执行引擎,适用于小规模或者中型数据集的快速计算。基于 Spark Catalyst 实现,Enzyme SQL 在查询层面 和 Spark SQL 完全兼容。至于 Dataframe,在 Enzyme 中有对应的 Protein。在 API 的层次上,Protein 和 Spark Dataframe 几乎完全一致。

应用

Enzyme SQL 目前应用于信贷风控体系中的变量中心。变量,也就是指标或者特征,是描述一个用户的一个值。最初,变量的加工逻辑由负责风控的数据分析师提供,需要通过数据团队的工程师用 Java 代码实现。这种方式比较原始,研发的链路和周期也相对冗长。故而,我们使用 SQL 作为一种加工变量的 DSL,提供在离线和实时两个平台上的一致语义。

为什么要使用 SQL 呢?首先,自研 DSL 需要做很多设计,包括易用性、实现层面的性能等等;其次,自研的 DSL 最终被接受被高效使用,不可避免会有一个相对较长的磨合周期;最后,SQL 作为数据分析师的看家本领,没有使用的障碍和语义上的歧义,其实现也已经有大量现有的代码可供参考。

Enzyme SQL 引擎极致的性能表现和非常低的 CPU 占用与内存消耗,有效地支撑了变量中心庞大的计算量(一个用户就会触发数以千计的变量计算)。

实践

Enzyme 设计之初就是以兼容 Spark SQL 为目标的,故而在使用上,和 Spark SQL 的 API 大体是一致的。EnzymeSession 即 SparkSession,Protein 即 Dataframe。

我们从构建一个 Protein 数据集开始:

// a session for computing
val conf = new EnzymeConf
val session = new EnzymeSession(conf)

// construct a protein from rows and schemas
val schema = StructType(Seq(
  StructField("x", LongType),
  StructField("y", StringType),
  StructField("z", DoubleType),
  StructField("in", IntegerType)
))
val rows = Seq(Row(1L, "234L", 1.1, 12),
  Row(2L, "23L", 23.4, 4245),
  Row(2L, "65L", 5244.2, 234),
  Row(null, "7L", 245.234, 5245),
  Row(4L, "7L", 245.234, 5245))
val df = new Protein(session, rows, schema)

这样的一个数据集可以直接展示:

> df.show()

+----+----+-------+----+
|   x|   y|      z|  in|
+----+----+-------+----+
|   1|234L|    1.1|  12|
|   2| 23L|   23.4|4245|
|   2| 65L| 5244.2| 234|
|null|  7L|245.234|5245|
|   4|  7L|245.234|5245|
+----+----+-------+----+

如果要使用 SQL,首先我们要把这个数据集和一个表名关联起来:

> session.register(tableName = "a", df)
> session.sql("select * from a").show()
+----+----+-------+----+
|   x|   y|      z|  in|
+----+----+-------+----+
|   1|234L|    1.1|  12|
|   2| 23L|   23.4|4245|
|   2| 65L| 5244.2| 234|
|null|  7L|245.234|5245|
|   4|  7L|245.234|5245|
+----+----+-------+----+

上面的代码中session.sql()的结果还是一个 Protein。除了使用 SQL,我们还可以使用 Protein 里面丰富的 API:

> session.sql("select * from a order by x asc").show()
+----+----+-------+----+
|   x|   y|      z|  in|
+----+----+-------+----+
|null|  7L|245.234|5245|
|   1|234L|    1.1|  12|
|   2| 23L|   23.4|4245|
|   2| 65L| 5244.2| 234|
|   4|  7L|245.234|5245|
+----+----+-------+----+

> df.sort("x").show()
+----+----+-------+----+
|   x|   y|      z|  in|
+----+----+-------+----+
|null|  7L|245.234|5245|
|   1|234L|    1.1|  12|
|   2| 23L|   23.4|4245|
|   2| 65L| 5244.2| 234|
|   4|  7L|245.234|5245|
+----+----+-------+----+

更多用法的细节可以查看 Spark SQL 的文档,也可以查看 Enzyme 的文档。

实现

Enzyme 基于 Spark Catalyst 实现,而 Catalyst 对标的开源项目是 Apache Calcite。Apache Phoenix 和 Apache Hive 等众多项目都在使用 Calcite。因为我们的目标是兼容 Spark SQL,自然而然选择了 Catalyst,作为 SQL 的解析器、逻辑计划的执行器和优化器。

Spark Catalyst 概览

上面的层次结构简明地概括了一个 SQL 从最原始的 SQL 文本,到最后执行的各个阶段。其中加粗的部分是 Enzyme 中所实现的,未加粗的部分是 Catalyst 所提供的功能。

解析,就是用 Antlr4 将 SQL 文本变成一棵 AST 树,这个 AST 树经过转换,变成了最原始的逻辑计划。在这样的逻辑计划中,我们是不知道*所表示的字段究竟是哪些。

分析,就是结合 Catalog 中的元数据信息,将原始的逻辑计划中各个未确定的部分(比如*)和元数据匹配确定下来。如果发现类型无法满足或者所引用的字段根本不存在,就直接抛出 AnalysisException。

优化,即通过逻辑计划的等价变换,转换得到最优的逻辑计划。Catalyst 中内置了一系列既有的优化规则,比如谓词下推和列剪裁。我们也可以通过 Catalyst 提供的接口,将自己研发的优化规则加入其中。这里的优化就是 RBO,基于规则的优化。

最后是物理计划的生成,一个优化过后的逻辑计划其实可以生成多种等效的物理计划,数据最终决定了其中一个物理计划是最优的。在没有时光机的当下,我们无法将所有物理计划都运行一遍,再选择最优的那个。所以通常的做法是,收集一些关于底层表的统计信息,依据这些信息,预判出执行效率最高的物理计划。这就是所谓的 CBO,基于代价的优化。

一个 SQL 的一生

SELECT *
FROM employee
INNER JOIN department
ON employee.DepartmentID = department.DepartmentID

我们用上面这个 SQL 来详细了解一下上述各个阶段。

分析阶段

Project [*]
+- 'Join Inner, ('employee.DepartmentID = 'department.DepartmentID)
   :- 'UnresolvedRelation `employee`
   +- 'UnresolvedRelation `department`


Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1]
+- Join Inner, (DepartmentID#7L = DepartmentID#0L)
   :- SubqueryAlias employee
   :  +- LocalRelation [LastName#6, DepartmentID#7L]
   +- SubqueryAlias department
      +- LocalRelation [DepartmentID#0L, DepartmentName#1]

我们看到*已经被展开成了四个明确的字段,而且每个字段都有明确的 ID 标志,从而可以明确判定这个字段来自于哪一个表。当我们需要对 Spark SQL 做精确到字段级别的权限控制的时候,我们所需要的其实就是经过分析的逻辑计划。

优化

Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1]
+- Join Inner, (DepartmentID#7L = DepartmentID#0L)
   :- SubqueryAlias employee
   :  +- LocalRelation [LastName#6, DepartmentID#7L]
   +- SubqueryAlias department
      +- LocalRelation [DepartmentID#0L, DepartmentName#1]

Join Inner, (DepartmentID#7L = DepartmentID#0L)
:- Filter isnotnull(DepartmentID#7L)
:  +- LocalRelation [LastName#6, DepartmentID#7L]
+- Filter isnotnull(DepartmentID#0L)
   +- LocalRelation [DepartmentID#0L, DepartmentName#1]

因为这是一个 inner join,所以这里的一个优化点其实是在做 join 之前,把 join key 为 null 的行过滤掉。

物理计划的生成

我们模仿 Spark SQL 中 SparkPlan 的实现,提供了简化的 EnzymePlan:

abstract class EnzymePlan extends QueryPlan[EnzymePlan] {
  def iterator: Iterator[InternalRow]
  override def output: Seq[Attribute]
  ...
}

trait LeafExecNode extends EnzymePlan {
  override final def children: Seq[EnzymePlan] = Nil
}

trait UnaryExecNode extends EnzymePlan {
  def child: EnzymePlan

  override final def children: Seq[EnzymePlan] = child :: Nil
}

trait BinaryExecNode extends EnzymePlan {
  def left: EnzymePlan

  def right: EnzymePlan

  override final def children: Seq[EnzymePlan] = Seq(left, right)
}

在这个代码片段中,EnzymePlan 是核心,其中 output 表示一个物理计划的节点上结果集的元数据信息,而 iterator 则是调用这个物理计划节点的入口。我们看到有三类物理计划:

Enzyme 中的部分物理计划实现分类之后,如上所示。物理计划整体上是一棵树,数据实际上是从叶节点(Leaf)开始,经过过滤或者转换(Unary)或者合流(Binary),最终汇聚到根节点,得到计算结果。叶节点就是我们的数据源。有两个输入源的是 Union 或者 Join,而只有一个输入源的就是 Projection,Filter,Sort 等算子。

上一节中优化之后的逻辑计划可以生成这样的物理计划:

HashJoinExec [DepartmentID#11L], [DepartmentID#4L]
         , BuildRight, Inner
:- FilterExec isnotnull(DepartmentID#11L)
:  +- LazyLocalTableScan [LastName#10, DepartmentID#11L],
                         employee, catalog@60dcf9ec
 +- FilterExec isnotnull(DepartmentID#4L)
   +- LazyLocalTableScan [DepartmentID#4L, DepartmentName#5],
                         department, catalog@60dcf9ec

计算通过在根节点调用 iterator 方法,层层回溯:

HashJoinExec.iterator
 + FilterExec.iterator
   + LazyLocalTableScan(employee).iterator
 + FilterExec.iterator
   + LazyLocalTableScan(department).iterator

性能调优

首先,我们需要定位性能瓶颈。JVM 生态中有很多做 Profiling 的工具。Enzyme 在优化过程中,使用的是 JDK 中自带的 jmc 命令和 FlightRecord。通过 jmc 的分析,可以定位到热点的方法,耗时的方法等有帮助的信息。我们有两种优化的策略。

优化点一:动态代码生成调优

Spark 的钨丝计划引入了动态代码生成的技术,比较有效地解决了三方面的问题(详见参考资料 2 ):

对于 Enzyme 的使用场景,动态代码生成并不一定有性能优化的效果,我们使用 JMH 做基准测试,将一部分使得性能变差的代码生成关闭掉。

数以千计的 SQL 会生成大量 Java 类,在引擎中编译并缓存,会带来一些内存上的占用和 CPU 的消耗,也是我们做取舍的其中一个原因。

优化点二:缓存

我们做的最主要的缓存就是从 Unresolve Logical Plan 到 Physical Plan 的生成。为什么不是直接从 SQL 到 Physical Plan 呢?因为 SQL 解析的开销实际上很小,而且略有差异的 SQL 所生成的 Unresolved Logical Plan 可能是一模一样的。

在物理计划的缓存中,还有两点需要注意:

只有这样,我们的缓存才是有效的、正确无误的。另外,在表的 schema 发生改变的时候,我们还需要让所缓存的相关物理计划失效。

优化点三:新增逻辑计划优化规则

Catalyst 中的优化器提供了可扩展的接口,使得我们可以自定义逻辑计划优化的规则。Databricks 在 Spark Summit 上做过一个题为 A Deep Dive into Spark SQL's Catalyst Optimizer 的讲座,其中有细节的介绍。

具体的接口如下:

spark.experimental.extraStrategies = CustomizedExtraStrategy :: Nil

我们利用这个接口,针对我们的业务数据,专门定制了一系列额外的优化规则,极大地提升了引擎的性能。

Enzyme 的未来

  1. 开源
  2. 做更多针对小数据集的优化,进一步改善性能
  3. 基于 Enzyme,做一些上层生态的扩展

对于第三点,我们想做的实际上是让 Enzyme 和其他生态更好地结合。比如如何将 Enzyme 运用到 Spark Streaming 或者 Flink Streaming 中,如何在 Spring Boot 中更加方便地使用 Enzyme,如何在机器学习中使用 Enzyme。

参考资料

  1. Spear: A playground for experimenting ideas that may apply to Spark SQL/Catalyst
  2. Scala Benchmark Starter
  3. 《 Spark SQL 内核剖析》
  4. 《高性能 Scala 》

作者简介

忍冬,挖财数据研发工程师,负责 Spark SQL 在挖财的落地,自研了兼容 Spark SQL 适用于单机小数据集的 Enzyme SQL 引擎。译有《 Scala 实用指南》,业余时间是 GNU TeXmacs 项目的维护者之一。

2433 次点击
所在节点    程序员
5 条回复
sadhen
2019-01-05 01:16:19 +08:00
忘记贴我的知乎专栏了: https://zhuanlan.zhihu.com/p/50189343

这篇是索引,主要是学习 R 大: https://zhuanlan.zhihu.com/p/25042028

我的索引都是 Scala 相关的,大部分都比较浅显易懂,只学 R 大的形式,学不了内涵,欢迎正在或者希望从事大数据研发的朋友们阅读,巩固好扎实的 Scala 基础。

对于 Scala 有任何问题,也欢迎在评论去提问。
miaoever
2019-01-05 05:47:59 +08:00
有点不太明白的是,在 Enzyme 生成 Physical plan 后的 runtime,是直接在本地 code gen 然后运行生成的代码?
sadhen
2019-01-05 09:54:08 +08:00
@miaoever 大佬,你司的 Presto 在 Codegen 这块做得更加成熟。原理都是一样的,可以看这里 https://zhuanlan.zhihu.com/p/53469238
miaoever
2019-01-05 23:57:29 +08:00
@sadhen 不不不,我只是好奇 Enzyme 本身是否是最终的 runtime 来执行查询计划还是会分发给另外的 runtime 来负责执行。
sadhen
2019-01-07 10:29:52 +08:00
@miaoever 简单的说, Enzyme SQL 可以认为是 Spark SQL 的单机版,为了低延时和高吞吐而实现。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/524040

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX