介绍
DataFusion
是一个用 Rust
编写的可扩展查询引擎,使用 Apache Arrow
作为其内存格式,是一个查询引擎而非数据库,因此其本身不具备存储数据的能力
使用复杂的查询规划器、列式、多线程、矢量化执行引擎和分区数据源快速运行复杂的 SQL
和 DataFrame
查询
DataFusion
旨在轻松定制,例如支持其他数据源、查询语言、函数、自定义运算符等
原生支持了查询CSV,Parquet,Avro,Json
等存储格式,也支持了本地,AWS S3,Azure Blob Storage,Google Cloud Storage
等多种数据源
DBMS
与 Query Engine
的区别
DBMS: DataBase Management System
DBMS
是一个包含完整数据库管理特性的系统,主要包含以下几个模块:
- 存储系统
- 元数据(
Catalog
) - 查询引擎(
Query Engine
) - 访问控制和权限
- 资源管理
- 管理工具
- 客户端
- 多节点管理
Query Engine
查询引擎属于数据库管理系统的一部分,是用户与数据库交互的主要接口,主要作用是将面向用户的高阶查询语句翻译成可被具体执行的数据处理单元操作,然后执行操作获取数据
DataFusion
查询引擎主要模块
前端
主要涉及
DFParser
和SqlToRel
这两个struct
语法解析
语义分析
Planner
:语法树转换成逻辑计划
查询中间表示
主要涉及
LogicalPlan
和Expr
这两个枚举类Expression
(表达式)/Type system
(类型系统)Query Plan
/Relational Operators
(关系算子)Rewrites
/Optimizations
(逻辑计划优化)
查询底层表示
主要涉及
PhysicalPlanner
这个trait
实现的逻辑计划到物理计划的转换,其中主要的关键点是ExecutionPlan
和PhysicalExpr
Statistics
(物理计划算子的统计信息,辅助物理计划优化)Partitions
(分块,多线程执行物理计划算子)Sort orders
(物理计划算子对数据是否排序)Algorithms
(物理计划算子的执行算法,如Hash join
和Merge join
)Rewrites / Optimizations
(物理计划优化)
执行运行时(算子)
主要涉及所有执行算子,如
GroupedHashAggregateStream
分配资源
向量化计算
自定义和扩展
DataFusion
被设计为“分解”查询引擎
意味着开发人员可以混合和扩展其用例所需的 DataFusion
部分
例如, ExecutionPlan
运算或 SqlToRel SQL
规划器和优化器
为了实现这一点,DataFusion
在很多方面支持扩展
- 从任何数据源读取(
TableProvider
) - 自定义
catalogs, schemas, and table lists
(CatalogProvider
) - 使用 (
LogicalPlanBuilder
) 构建您自己的查询语言或计划 - 声明和使用用户定义的函数(
ScalarUDF
和AggregateUDF
) - 添加自定义优化器重写过程(
OptimizerRule
和PhysicalOptimizerRule
) - 扩展规划器以使用用户定义的逻辑和物理节点(
QueryPlanner
)
查询计划和执行
sql
方面
Parsed with SqlToRel creates
sqlparser initial plan
┌───────────────┐ ┌─────────┐ ┌─────────────┐
│ SELECT * │ │Query { │ │Project │
│ FROM ... │──────────▶│.. │────────────▶│ TableScan │
│ │ │} │ │ ... │
└───────────────┘ └─────────┘ └─────────────┘
SQL String sqlparser LogicalPlan
AST nodes
Parsed with SqlToRel creates
sqlparser initial plan
┌───────────────┐ ┌─────────┐ ┌─────────────┐
│ SELECT * │ │Query { │ │Project │
│ FROM ... │──────────▶│.. │────────────▶│ TableScan │
│ │ │} │ │ ... │
└───────────────┘ └─────────┘ └─────────────┘
SQL String sqlparser LogicalPlan
AST nodes
sql
查询语句会使用sqloarser
库(https://github.com/sqlparser-rs/sqlparser-rs
)进行解析,变成AST
(Abstract Syntax Tree
抽象语法树)AST
之后会通过datafusion::sql::planner::SqlToRel
进行初始化,转变生成LogicalPlan
(逻辑计划,由各个逻辑算子组成的树状结构)和logical expressions Expr
(逻辑表达式),用与计算生成期望的查询结果
DataFrame
方面
使用 DataFrame API
执行计划时,该过程与 SQL
相同,只是 DataFrame API
直接使用 LogicalPlanBuilder
构建 LogicalPlan
, 拥有自己的自定义查询语言的系统通常也会直接构建 LogicalPlan
计划执行Planning
AnalyzerRules and PhysicalPlanner PhysicalOptimizerRules
OptimizerRules creates ExecutionPlan improve performance
rewrite plan
┌─────────────┐ ┌─────────────┐ ┌───────────────┐ ┌───────────────┐
│Project │ │Project(x, y)│ │ProjectExec │ │ProjectExec │
│ TableScan │──...──▶│ TableScan │─────▶│ ... │──...──▶│ ... │
│ ... │ │ ... │ │ ParquetExec│ │ ParquetExec│
└─────────────┘ └─────────────┘ └───────────────┘ └───────────────┘
LogicalPlan LogicalPlan ExecutionPlan ExecutionPlan
AnalyzerRules and PhysicalPlanner PhysicalOptimizerRules
OptimizerRules creates ExecutionPlan improve performance
rewrite plan
┌─────────────┐ ┌─────────────┐ ┌───────────────┐ ┌───────────────┐
│Project │ │Project(x, y)│ │ProjectExec │ │ProjectExec │
│ TableScan │──...──▶│ TableScan │─────▶│ ... │──...──▶│ ... │
│ ... │ │ ... │ │ ParquetExec│ │ ParquetExec│
└─────────────┘ └─────────────┘ └───────────────┘ └───────────────┘
LogicalPlan LogicalPlan ExecutionPlan ExecutionPlan
为了尽可能高效地处理具有多行的大型数据集,需要花费大量精力进行规划和优化,具体方式如下:
- 由
AnalyzerRules
检查并重写LogicalPlan
以强制实施检查语义规则,例如强类型检查 LogicalPlan
会被OptimizerRules
重写进行优化,如projection
投影、filter pushdown
过滤器下推等,以提高效率LogicalPlan
由PhysicalPlanner
转换为ExecutionPlan
ExecutionPlan
由PhysicalOptimizerRules
重写,例如sort
排序和join selection
,以提高效率
Data Sources
数据源
Planning │
requests │ TableProvider::scan
information │ creates an
such as schema │ ExecutionPlan
│
▼
┌─────────────────────────┐ ┌──────────────┐
│ │ │ │
│impl TableProvider │────────▶│ParquetExec │
│ │ │ │
└─────────────────────────┘ └──────────────┘
TableProvider
(built in or user provided) ExecutionPlan
Planning │
requests │ TableProvider::scan
information │ creates an
such as schema │ ExecutionPlan
│
▼
┌─────────────────────────┐ ┌──────────────┐
│ │ │ │
│impl TableProvider │────────▶│ParquetExec │
│ │ │ │
└─────────────────────────┘ └──────────────┘
TableProvider
(built in or user provided) ExecutionPlan
DataFusion
包含多个用于常见用例的内置数据源,并且可以通过实现 TableProvider
特征进行扩展, TableProvider
提供用于规划的信息和用于执行的 ExecutionPlans
ListingTable
:从Parquet, JSON, CSV, AVRO
文件读取数据,支持具有HIVE
样式分区、可选压缩、直接从远程对象存储读取等的单个文件或多个文件MemTable
:从内存中的RecordBatches
读取数据StreamingTable
:从潜在的无限输入中读取数据
执行
ExecutionPlan::execute Calling next() on the
produces a stream stream produces the data
┌───────────────┐ ┌─────────────────────────┐ ┌────────────┐
│ProjectExec │ │impl │ ┌───▶│RecordBatch │
│ ... │─────▶│SendableRecordBatchStream│────┤ └────────────┘
│ ParquetExec│ │ │ │ ┌────────────┐
└───────────────┘ └─────────────────────────┘ ├───▶│RecordBatch │
▲ │ └────────────┘
ExecutionPlan │ │ ...
│ │
│ │ ┌────────────┐
PhysicalOptimizerRules ├───▶│RecordBatch │
request information │ └────────────┘
such as partitioning │ ┌ ─ ─ ─ ─ ─ ─
└───▶ None │
└ ─ ─ ─ ─ ─ ─
ExecutionPlan::execute Calling next() on the
produces a stream stream produces the data
┌───────────────┐ ┌─────────────────────────┐ ┌────────────┐
│ProjectExec │ │impl │ ┌───▶│RecordBatch │
│ ... │─────▶│SendableRecordBatchStream│────┤ └────────────┘
│ ParquetExec│ │ │ │ ┌────────────┐
└───────────────┘ └─────────────────────────┘ ├───▶│RecordBatch │
▲ │ └────────────┘
ExecutionPlan │ │ ...
│ │
│ │ ┌────────────┐
PhysicalOptimizerRules ├───▶│RecordBatch │
request information │ └────────────┘
such as partitioning │ ┌ ─ ─ ─ ─ ─ ─
└───▶ None │
└ ─ ─ ─ ─ ─ ─
ExecutionPlans
使用 Apache Arrow
内存格式处理数据,大量使用 arrow crate
中的函数。 调用execute
会产生1个或多个数据分区,其中包含一个实现SendableRecordBatchStream
的运算符。 值用 ColumnarValue
表示,可以是 ScalarValue
(单个常量值)或 ArrayRef
(箭头数组)。 平衡并行性是使用 RepartitionExec
实现的,它实现了 Volcano
风格的“交换”