V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
sadhen
V2EX  ›  程序员

Enzyme SQL 引擎的实现与优化

  •  
  •   sadhen ·
    darcy-shen · 2019-01-05 01:00:21 +08:00 · 2417 次点击
    这是一个创建于 2141 天前的主题,其中的信息可能已经有所发展或是发生改变。

    开了掘金的专栏: https://juejin.im/post/5c2dd18af265da61285a3e08

    因为掘金的 Markdown 编辑器还是很好用的。后面团队里面别的小伙伴和老司机们的文章也会在上面陆续发布。欢迎关注。知乎的专栏得公司申请认证,还在走流程。

    《 Scala 实用指南》快要第二次印刷了,勘误还是太少,希望 V 友们如果买了,多多反馈。另外,我在知乎的 Scala 专栏已经写了很久了,作为图书的补充,欢迎大家阅读。Enzyme SQL 就是我翻译完《 Scala 实用指南》用 Scala 编写的,在我的序(序和前面的章节在异步社区和微信阅读都是免费的)里面,大家可以看到相关信息。


    Enzyme 是挖财数据团队自研的 SQL 执行引擎,适用于小规模或者中型数据集的快速计算。基于 Spark Catalyst 实现,Enzyme SQL 在查询层面 和 Spark SQL 完全兼容。至于 Dataframe,在 Enzyme 中有对应的 Protein。在 API 的层次上,Protein 和 Spark Dataframe 几乎完全一致。

    应用

    Enzyme SQL 目前应用于信贷风控体系中的变量中心。变量,也就是指标或者特征,是描述一个用户的一个值。最初,变量的加工逻辑由负责风控的数据分析师提供,需要通过数据团队的工程师用 Java 代码实现。这种方式比较原始,研发的链路和周期也相对冗长。故而,我们使用 SQL 作为一种加工变量的 DSL,提供在离线和实时两个平台上的一致语义。

    为什么要使用 SQL 呢?首先,自研 DSL 需要做很多设计,包括易用性、实现层面的性能等等;其次,自研的 DSL 最终被接受被高效使用,不可避免会有一个相对较长的磨合周期;最后,SQL 作为数据分析师的看家本领,没有使用的障碍和语义上的歧义,其实现也已经有大量现有的代码可供参考。

    Enzyme SQL 引擎极致的性能表现和非常低的 CPU 占用与内存消耗,有效地支撑了变量中心庞大的计算量(一个用户就会触发数以千计的变量计算)。

    实践

    Enzyme 设计之初就是以兼容 Spark SQL 为目标的,故而在使用上,和 Spark SQL 的 API 大体是一致的。EnzymeSession 即 SparkSession,Protein 即 Dataframe。

    我们从构建一个 Protein 数据集开始:

    // a session for computing
    val conf = new EnzymeConf
    val session = new EnzymeSession(conf)
    
    // construct a protein from rows and schemas
    val schema = StructType(Seq(
      StructField("x", LongType),
      StructField("y", StringType),
      StructField("z", DoubleType),
      StructField("in", IntegerType)
    ))
    val rows = Seq(Row(1L, "234L", 1.1, 12),
      Row(2L, "23L", 23.4, 4245),
      Row(2L, "65L", 5244.2, 234),
      Row(null, "7L", 245.234, 5245),
      Row(4L, "7L", 245.234, 5245))
    val df = new Protein(session, rows, schema)
    

    这样的一个数据集可以直接展示:

    > df.show()
    
    +----+----+-------+----+
    |   x|   y|      z|  in|
    +----+----+-------+----+
    |   1|234L|    1.1|  12|
    |   2| 23L|   23.4|4245|
    |   2| 65L| 5244.2| 234|
    |null|  7L|245.234|5245|
    |   4|  7L|245.234|5245|
    +----+----+-------+----+
    

    如果要使用 SQL,首先我们要把这个数据集和一个表名关联起来:

    > session.register(tableName = "a", df)
    > session.sql("select * from a").show()
    +----+----+-------+----+
    |   x|   y|      z|  in|
    +----+----+-------+----+
    |   1|234L|    1.1|  12|
    |   2| 23L|   23.4|4245|
    |   2| 65L| 5244.2| 234|
    |null|  7L|245.234|5245|
    |   4|  7L|245.234|5245|
    +----+----+-------+----+
    

    上面的代码中session.sql()的结果还是一个 Protein。除了使用 SQL,我们还可以使用 Protein 里面丰富的 API:

    > session.sql("select * from a order by x asc").show()
    +----+----+-------+----+
    |   x|   y|      z|  in|
    +----+----+-------+----+
    |null|  7L|245.234|5245|
    |   1|234L|    1.1|  12|
    |   2| 23L|   23.4|4245|
    |   2| 65L| 5244.2| 234|
    |   4|  7L|245.234|5245|
    +----+----+-------+----+
    
    > df.sort("x").show()
    +----+----+-------+----+
    |   x|   y|      z|  in|
    +----+----+-------+----+
    |null|  7L|245.234|5245|
    |   1|234L|    1.1|  12|
    |   2| 23L|   23.4|4245|
    |   2| 65L| 5244.2| 234|
    |   4|  7L|245.234|5245|
    +----+----+-------+----+
    

    更多用法的细节可以查看 Spark SQL 的文档,也可以查看 Enzyme 的文档。

    实现

    Enzyme 基于 Spark Catalyst 实现,而 Catalyst 对标的开源项目是 Apache Calcite。Apache Phoenix 和 Apache Hive 等众多项目都在使用 Calcite。因为我们的目标是兼容 Spark SQL,自然而然选择了 Catalyst,作为 SQL 的解析器、逻辑计划的执行器和优化器。

    Spark Catalyst 概览

    • SQL Text
      • (parse): Unresolved Logical Plan
        • (analyze): Resolved Logical Plan
          • (optimize): Optimized Logical Plan(s) ----- RBO
            • (planning): Physical Planning ------ CBO

    上面的层次结构简明地概括了一个 SQL 从最原始的 SQL 文本,到最后执行的各个阶段。其中加粗的部分是 Enzyme 中所实现的,未加粗的部分是 Catalyst 所提供的功能。

    解析,就是用 Antlr4 将 SQL 文本变成一棵 AST 树,这个 AST 树经过转换,变成了最原始的逻辑计划。在这样的逻辑计划中,我们是不知道*所表示的字段究竟是哪些。

    分析,就是结合 Catalog 中的元数据信息,将原始的逻辑计划中各个未确定的部分(比如*)和元数据匹配确定下来。如果发现类型无法满足或者所引用的字段根本不存在,就直接抛出 AnalysisException。

    优化,即通过逻辑计划的等价变换,转换得到最优的逻辑计划。Catalyst 中内置了一系列既有的优化规则,比如谓词下推和列剪裁。我们也可以通过 Catalyst 提供的接口,将自己研发的优化规则加入其中。这里的优化就是 RBO,基于规则的优化。

    最后是物理计划的生成,一个优化过后的逻辑计划其实可以生成多种等效的物理计划,数据最终决定了其中一个物理计划是最优的。在没有时光机的当下,我们无法将所有物理计划都运行一遍,再选择最优的那个。所以通常的做法是,收集一些关于底层表的统计信息,依据这些信息,预判出执行效率最高的物理计划。这就是所谓的 CBO,基于代价的优化。

    一个 SQL 的一生

    SELECT *
    FROM employee
    INNER JOIN department
    ON employee.DepartmentID = department.DepartmentID
    

    我们用上面这个 SQL 来详细了解一下上述各个阶段。

    分析阶段

    Project [*]
    +- 'Join Inner, ('employee.DepartmentID = 'department.DepartmentID)
       :- 'UnresolvedRelation `employee`
       +- 'UnresolvedRelation `department`
    
    
    Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1]
    +- Join Inner, (DepartmentID#7L = DepartmentID#0L)
       :- SubqueryAlias employee
       :  +- LocalRelation [LastName#6, DepartmentID#7L]
       +- SubqueryAlias department
          +- LocalRelation [DepartmentID#0L, DepartmentName#1]
    

    我们看到*已经被展开成了四个明确的字段,而且每个字段都有明确的 ID 标志,从而可以明确判定这个字段来自于哪一个表。当我们需要对 Spark SQL 做精确到字段级别的权限控制的时候,我们所需要的其实就是经过分析的逻辑计划。

    优化

    Project [LastName#6, DepartmentID#7L, DepartmentID#0L, DepartmentName#1]
    +- Join Inner, (DepartmentID#7L = DepartmentID#0L)
       :- SubqueryAlias employee
       :  +- LocalRelation [LastName#6, DepartmentID#7L]
       +- SubqueryAlias department
          +- LocalRelation [DepartmentID#0L, DepartmentName#1]
    
    Join Inner, (DepartmentID#7L = DepartmentID#0L)
    :- Filter isnotnull(DepartmentID#7L)
    :  +- LocalRelation [LastName#6, DepartmentID#7L]
    +- Filter isnotnull(DepartmentID#0L)
       +- LocalRelation [DepartmentID#0L, DepartmentName#1]
    

    因为这是一个 inner join,所以这里的一个优化点其实是在做 join 之前,把 join key 为 null 的行过滤掉。

    物理计划的生成

    我们模仿 Spark SQL 中 SparkPlan 的实现,提供了简化的 EnzymePlan:

    abstract class EnzymePlan extends QueryPlan[EnzymePlan] {
      def iterator: Iterator[InternalRow]
      override def output: Seq[Attribute]
      ...
    }
    
    trait LeafExecNode extends EnzymePlan {
      override final def children: Seq[EnzymePlan] = Nil
    }
    
    trait UnaryExecNode extends EnzymePlan {
      def child: EnzymePlan
    
      override final def children: Seq[EnzymePlan] = child :: Nil
    }
    
    trait BinaryExecNode extends EnzymePlan {
      def left: EnzymePlan
    
      def right: EnzymePlan
    
      override final def children: Seq[EnzymePlan] = Seq(left, right)
    }
    

    在这个代码片段中,EnzymePlan 是核心,其中 output 表示一个物理计划的节点上结果集的元数据信息,而 iterator 则是调用这个物理计划节点的入口。我们看到有三类物理计划:

    • LeafExecNode: LocalTableScanExec, LazyLocalTableScanExec
    • UnaryExecNode: ProjectExec, LimitExec, FilterExec
    • BinaryExecNode: HashJoinExec, NestedLoopExec

    Enzyme 中的部分物理计划实现分类之后,如上所示。物理计划整体上是一棵树,数据实际上是从叶节点(Leaf)开始,经过过滤或者转换(Unary)或者合流(Binary),最终汇聚到根节点,得到计算结果。叶节点就是我们的数据源。有两个输入源的是 Union 或者 Join,而只有一个输入源的就是 Projection,Filter,Sort 等算子。

    上一节中优化之后的逻辑计划可以生成这样的物理计划:

    HashJoinExec [DepartmentID#11L], [DepartmentID#4L]
             , BuildRight, Inner
    :- FilterExec isnotnull(DepartmentID#11L)
    :  +- LazyLocalTableScan [LastName#10, DepartmentID#11L],
                             employee, catalog@60dcf9ec
     +- FilterExec isnotnull(DepartmentID#4L)
       +- LazyLocalTableScan [DepartmentID#4L, DepartmentName#5],
                             department, catalog@60dcf9ec
    

    计算通过在根节点调用 iterator 方法,层层回溯:

    HashJoinExec.iterator
     + FilterExec.iterator
       + LazyLocalTableScan(employee).iterator
     + FilterExec.iterator
       + LazyLocalTableScan(department).iterator
    

    性能调优

    首先,我们需要定位性能瓶颈。JVM 生态中有很多做 Profiling 的工具。Enzyme 在优化过程中,使用的是 JDK 中自带的 jmc 命令和 FlightRecord。通过 jmc 的分析,可以定位到热点的方法,耗时的方法等有帮助的信息。我们有两种优化的策略。

    • 其一,直接替换掉慢的部分
    • 其二,对无法优化的部分做必要的缓存
    • 其三,逻辑计划优化

    优化点一:动态代码生成调优

    Spark 的钨丝计划引入了动态代码生成的技术,比较有效地解决了三方面的问题(详见参考资料 2 ):

    • 大量虚函数调用,生成的实际代码不再需要执行表达式系统中统一定义的虚函数
    • 判断数据类型和操作算子等内容的大型分支选择语句
    • 常数传播限制,生成的代码能够确定性地折叠常量

    对于 Enzyme 的使用场景,动态代码生成并不一定有性能优化的效果,我们使用 JMH 做基准测试,将一部分使得性能变差的代码生成关闭掉。

    数以千计的 SQL 会生成大量 Java 类,在引擎中编译并缓存,会带来一些内存上的占用和 CPU 的消耗,也是我们做取舍的其中一个原因。

    优化点二:缓存

    我们做的最主要的缓存就是从 Unresolve Logical Plan 到 Physical Plan 的生成。为什么不是直接从 SQL 到 Physical Plan 呢?因为 SQL 解析的开销实际上很小,而且略有差异的 SQL 所生成的 Unresolved Logical Plan 可能是一模一样的。

    在物理计划的缓存中,还有两点需要注意:

    • 其一,物理计划必须和数据隔离
    • 其二,物理计划的计算不能有副作用

    只有这样,我们的缓存才是有效的、正确无误的。另外,在表的 schema 发生改变的时候,我们还需要让所缓存的相关物理计划失效。

    优化点三:新增逻辑计划优化规则

    Catalyst 中的优化器提供了可扩展的接口,使得我们可以自定义逻辑计划优化的规则。Databricks 在 Spark Summit 上做过一个题为 A Deep Dive into Spark SQL's Catalyst Optimizer 的讲座,其中有细节的介绍。

    具体的接口如下:

    spark.experimental.extraStrategies = CustomizedExtraStrategy :: Nil
    

    我们利用这个接口,针对我们的业务数据,专门定制了一系列额外的优化规则,极大地提升了引擎的性能。

    Enzyme 的未来

    1. 开源
    2. 做更多针对小数据集的优化,进一步改善性能
    3. 基于 Enzyme,做一些上层生态的扩展

    对于第三点,我们想做的实际上是让 Enzyme 和其他生态更好地结合。比如如何将 Enzyme 运用到 Spark Streaming 或者 Flink Streaming 中,如何在 Spring Boot 中更加方便地使用 Enzyme,如何在机器学习中使用 Enzyme。

    参考资料

    1. Spear: A playground for experimenting ideas that may apply to Spark SQL/Catalyst
    2. Scala Benchmark Starter
    3. 《 Spark SQL 内核剖析》
    4. 《高性能 Scala 》

    作者简介

    忍冬,挖财数据研发工程师,负责 Spark SQL 在挖财的落地,自研了兼容 Spark SQL 适用于单机小数据集的 Enzyme SQL 引擎。译有《 Scala 实用指南》,业余时间是 GNU TeXmacs 项目的维护者之一。

    5 条回复    2019-01-07 10:29:52 +08:00
    sadhen
        1
    sadhen  
    OP
       2019-01-05 01:16:19 +08:00
    忘记贴我的知乎专栏了: https://zhuanlan.zhihu.com/p/50189343

    这篇是索引,主要是学习 R 大: https://zhuanlan.zhihu.com/p/25042028

    我的索引都是 Scala 相关的,大部分都比较浅显易懂,只学 R 大的形式,学不了内涵,欢迎正在或者希望从事大数据研发的朋友们阅读,巩固好扎实的 Scala 基础。

    对于 Scala 有任何问题,也欢迎在评论去提问。
    miaoever
        2
    miaoever  
       2019-01-05 05:47:59 +08:00
    有点不太明白的是,在 Enzyme 生成 Physical plan 后的 runtime,是直接在本地 code gen 然后运行生成的代码?
    sadhen
        3
    sadhen  
    OP
       2019-01-05 09:54:08 +08:00
    @miaoever 大佬,你司的 Presto 在 Codegen 这块做得更加成熟。原理都是一样的,可以看这里 https://zhuanlan.zhihu.com/p/53469238
    miaoever
        4
    miaoever  
       2019-01-05 23:57:29 +08:00
    @sadhen 不不不,我只是好奇 Enzyme 本身是否是最终的 runtime 来执行查询计划还是会分发给另外的 runtime 来负责执行。
    sadhen
        5
    sadhen  
    OP
       2019-01-07 10:29:52 +08:00
    @miaoever 简单的说, Enzyme SQL 可以认为是 Spark SQL 的单机版,为了低延时和高吞吐而实现。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1760 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 26ms · UTC 16:52 · PVG 00:52 · LAX 08:52 · JFK 11:52
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.