Skip to content

介绍

DataFusion 是一个用 Rust 编写的可扩展查询引擎,使用 Apache Arrow 作为其内存格式,是一个查询引擎而非数据库,因此其本身不具备存储数据的能力

使用复杂的查询规划器、列式、多线程、矢量化执行引擎和分区数据源快速运行复杂的 SQLDataFrame 查询

DataFusion 旨在轻松定制,例如支持其他数据源、查询语言、函数、自定义运算符等

原生支持了查询CSV,Parquet,Avro,Json等存储格式,也支持了本地,AWS S3,Azure Blob Storage,Google Cloud Storage等多种数据源

DBMSQuery Engine 的区别

DBMS: DataBase Management System

DBMS是一个包含完整数据库管理特性的系统,主要包含以下几个模块:

  • 存储系统
  • 元数据(Catalog)
  • 查询引擎(Query Engine)
  • 访问控制和权限
  • 资源管理
  • 管理工具
  • 客户端
  • 多节点管理

Query Engine

查询引擎属于数据库管理系统的一部分,是用户与数据库交互的主要接口,主要作用是将面向用户的高阶查询语句翻译成可被具体执行的数据处理单元操作,然后执行操作获取数据

DataFusion查询引擎主要模块

  • 前端

    主要涉及DFParserSqlToRel这两个struct

    • 语法解析

    • 语义分析

    • Planner:语法树转换成逻辑计划

  • 查询中间表示

    主要涉及LogicalPlanExpr这两个枚举类

    • Expression(表达式)/ Type system(类型系统)

    • Query Plan / Relational Operators(关系算子)

    • Rewrites / Optimizations(逻辑计划优化)

  • 查询底层表示

    主要涉及PhysicalPlanner这个trait实现的逻辑计划到物理计划的转换,其中主要的关键点是ExecutionPlanPhysicalExpr

    • Statistics(物理计划算子的统计信息,辅助物理计划优化)

    • Partitions(分块,多线程执行物理计划算子)

    • Sort orders(物理计划算子对数据是否排序)

    • Algorithms(物理计划算子的执行算法,如Hash joinMerge join

    • Rewrites / Optimizations(物理计划优化)

  • 执行运行时(算子)

    主要涉及所有执行算子,如GroupedHashAggregateStream

    • 分配资源

    • 向量化计算

自定义和扩展

DataFusion 被设计为“分解”查询引擎

意味着开发人员可以混合和扩展其用例所需的 DataFusion 部分

例如, ExecutionPlan 运算或 SqlToRel SQL 规划器和优化器

为了实现这一点,DataFusion 在很多方面支持扩展

  • 从任何数据源读取(TableProvider
  • 自定义catalogs, schemas, and table lists (CatalogProvider)
  • 使用 (LogicalPlanBuilder) 构建您自己的查询语言或计划
  • 声明和使用用户定义的函数(ScalarUDFAggregateUDF
  • 添加自定义优化器重写过程(OptimizerRulePhysicalOptimizerRule
  • 扩展规划器以使用用户定义的逻辑和物理节点(QueryPlanner

查询计划和执行

sql方面

                Parsed with            SqlToRel creates
                sqlparser              initial plan
┌───────────────┐           ┌─────────┐             ┌─────────────┐
│   SELECT *    │           │Query {  │             │Project      │
│   FROM ...    │──────────▶│..       │────────────▶│  TableScan  │
│               │           │}        │             │    ...      │
└───────────────┘           └─────────┘             └─────────────┘

  SQL String                 sqlparser               LogicalPlan
                             AST nodes
                Parsed with            SqlToRel creates
                sqlparser              initial plan
┌───────────────┐           ┌─────────┐             ┌─────────────┐
│   SELECT *    │           │Query {  │             │Project      │
│   FROM ...    │──────────▶│..       │────────────▶│  TableScan  │
│               │           │}        │             │    ...      │
└───────────────┘           └─────────┘             └─────────────┘

  SQL String                 sqlparser               LogicalPlan
                             AST nodes
  1. sql查询语句会使用sqloarser库(https://github.com/sqlparser-rs/sqlparser-rs)进行解析,变成ASTAbstract Syntax Tree抽象语法树)
  2. AST之后会通过datafusion::sql::planner::SqlToRel进行初始化,转变生成LogicalPlan(逻辑计划,由各个逻辑算子组成的树状结构)和logical expressions Expr(逻辑表达式),用与计算生成期望的查询结果

DataFrame方面

使用 DataFrame API 执行计划时,该过程与 SQL 相同,只是 DataFrame API 直接使用 LogicalPlanBuilder 构建 LogicalPlan, 拥有自己的自定义查询语言的系统通常也会直接构建 LogicalPlan

计划执行Planning

            AnalyzerRules and      PhysicalPlanner          PhysicalOptimizerRules
            OptimizerRules         creates ExecutionPlan    improve performance
            rewrite plan
┌─────────────┐        ┌─────────────┐      ┌───────────────┐        ┌───────────────┐
│Project      │        │Project(x, y)│      │ProjectExec    │        │ProjectExec    │
│  TableScan  │──...──▶│  TableScan  │─────▶│  ...          │──...──▶│  ...          │
│    ...      │        │    ...      │      │    ParquetExec│        │    ParquetExec│
└─────────────┘        └─────────────┘      └───────────────┘        └───────────────┘

 LogicalPlan            LogicalPlan         ExecutionPlan             ExecutionPlan
            AnalyzerRules and      PhysicalPlanner          PhysicalOptimizerRules
            OptimizerRules         creates ExecutionPlan    improve performance
            rewrite plan
┌─────────────┐        ┌─────────────┐      ┌───────────────┐        ┌───────────────┐
│Project      │        │Project(x, y)│      │ProjectExec    │        │ProjectExec    │
│  TableScan  │──...──▶│  TableScan  │─────▶│  ...          │──...──▶│  ...          │
│    ...      │        │    ...      │      │    ParquetExec│        │    ParquetExec│
└─────────────┘        └─────────────┘      └───────────────┘        └───────────────┘

 LogicalPlan            LogicalPlan         ExecutionPlan             ExecutionPlan

为了尽可能高效地处理具有多行的大型数据集,需要花费大量精力进行规划和优化,具体方式如下:

  1. AnalyzerRules 检查并重写 LogicalPlan 以强制实施检查语义规则,例如强类型检查
  2. LogicalPlan会被OptimizerRules重写进行优化,如projection投影、filter pushdown过滤器下推等,以提高效率
  3. LogicalPlanPhysicalPlanner 转换为 ExecutionPlan
  4. ExecutionPlanPhysicalOptimizerRules重写,例如sort排序和join selection,以提高效率

Data Sources数据源

Planning       │
requests       │            TableProvider::scan
information    │            creates an
such as schema │            ExecutionPlan


  ┌─────────────────────────┐         ┌──────────────┐
  │                         │         │              │
  │impl TableProvider       │────────▶│ParquetExec   │
  │                         │         │              │
  └─────────────────────────┘         └──────────────┘
        TableProvider
        (built in or user provided)    ExecutionPlan
Planning       │
requests       │            TableProvider::scan
information    │            creates an
such as schema │            ExecutionPlan


  ┌─────────────────────────┐         ┌──────────────┐
  │                         │         │              │
  │impl TableProvider       │────────▶│ParquetExec   │
  │                         │         │              │
  └─────────────────────────┘         └──────────────┘
        TableProvider
        (built in or user provided)    ExecutionPlan

DataFusion 包含多个用于常见用例的内置数据源,并且可以通过实现 TableProvider 特征进行扩展, TableProvider 提供用于规划的信息和用于执行的 ExecutionPlans

  1. ListingTable:从Parquet, JSON, CSV, AVRO 文件读取数据,支持具有 HIVE 样式分区、可选压缩、直接从远程对象存储读取等的单个文件或多个文件
  2. MemTable:从内存中的 RecordBatches 读取数据
  3. StreamingTable:从潜在的无限输入中读取数据

执行

           ExecutionPlan::execute             Calling next() on the
           produces a stream                  stream produces the data

┌───────────────┐      ┌─────────────────────────┐         ┌────────────┐
│ProjectExec    │      │impl                     │    ┌───▶│RecordBatch │
│  ...          │─────▶│SendableRecordBatchStream│────┤    └────────────┘
│    ParquetExec│      │                         │    │    ┌────────────┐
└───────────────┘      └─────────────────────────┘    ├───▶│RecordBatch │
              ▲                                       │    └────────────┘
ExecutionPlan │                                       │         ...
              │                                       │
              │                                       │    ┌────────────┐
            PhysicalOptimizerRules                    ├───▶│RecordBatch │
            request information                       │    └────────────┘
            such as partitioning                      │    ┌ ─ ─ ─ ─ ─ ─
                                                      └───▶ None        │
                                                           └ ─ ─ ─ ─ ─ ─
           ExecutionPlan::execute             Calling next() on the
           produces a stream                  stream produces the data

┌───────────────┐      ┌─────────────────────────┐         ┌────────────┐
│ProjectExec    │      │impl                     │    ┌───▶│RecordBatch │
│  ...          │─────▶│SendableRecordBatchStream│────┤    └────────────┘
│    ParquetExec│      │                         │    │    ┌────────────┐
└───────────────┘      └─────────────────────────┘    ├───▶│RecordBatch │
              ▲                                       │    └────────────┘
ExecutionPlan │                                       │         ...
              │                                       │
              │                                       │    ┌────────────┐
            PhysicalOptimizerRules                    ├───▶│RecordBatch │
            request information                       │    └────────────┘
            such as partitioning                      │    ┌ ─ ─ ─ ─ ─ ─
                                                      └───▶ None        │
                                                           └ ─ ─ ─ ─ ─ ─

ExecutionPlans 使用 Apache Arrow 内存格式处理数据,大量使用 arrow crate 中的函数。 调用execute会产生1个或多个数据分区,其中包含一个实现SendableRecordBatchStream的运算符。 值用 ColumnarValue 表示,可以是 ScalarValue(单个常量值)或 ArrayRef(箭头数组)。 平衡并行性是使用 RepartitionExec 实现的,它实现了 Volcano 风格的“交换”

参考阅读

Datafusion Docs.rs Architecture

Apache Arrow DataFusion原理与架构

Last updated:

Released under the MIT License.