介绍
DataFusion 是一个用 Rust 编写的可扩展查询引擎,使用 Apache Arrow 作为其内存格式,是一个查询引擎而非数据库,因此其本身不具备存储数据的能力
使用复杂的查询规划器、列式、多线程、矢量化执行引擎和分区数据源快速运行复杂的 SQL 和 DataFrame 查询
DataFusion 旨在轻松定制,例如支持其他数据源、查询语言、函数、自定义运算符等
原生支持了查询CSV,Parquet,Avro,Json等存储格式,也支持了本地,AWS S3,Azure Blob Storage,Google Cloud Storage等多种数据源
DBMS 与 Query Engine 的区别
DBMS: DataBase Management System
DBMS是一个包含完整数据库管理特性的系统,主要包含以下几个模块:
- 存储系统
- 元数据(
Catalog) - 查询引擎(
Query Engine) - 访问控制和权限
- 资源管理
- 管理工具
- 客户端
- 多节点管理
Query Engine
查询引擎属于数据库管理系统的一部分,是用户与数据库交互的主要接口,主要作用是将面向用户的高阶查询语句翻译成可被具体执行的数据处理单元操作,然后执行操作获取数据
DataFusion查询引擎主要模块
前端
主要涉及
DFParser和SqlToRel这两个struct语法解析
语义分析
Planner:语法树转换成逻辑计划
查询中间表示
主要涉及
LogicalPlan和Expr这两个枚举类Expression(表达式)/Type system(类型系统)Query Plan/Relational Operators(关系算子)Rewrites/Optimizations(逻辑计划优化)
查询底层表示
主要涉及
PhysicalPlanner这个trait实现的逻辑计划到物理计划的转换,其中主要的关键点是ExecutionPlan和PhysicalExprStatistics(物理计划算子的统计信息,辅助物理计划优化)Partitions(分块,多线程执行物理计划算子)Sort orders(物理计划算子对数据是否排序)Algorithms(物理计划算子的执行算法,如Hash join和Merge join)Rewrites / Optimizations(物理计划优化)
执行运行时(算子)
主要涉及所有执行算子,如
GroupedHashAggregateStream分配资源
向量化计算
自定义和扩展
DataFusion 被设计为“分解”查询引擎
意味着开发人员可以混合和扩展其用例所需的 DataFusion 部分
例如, ExecutionPlan 运算或 SqlToRel SQL 规划器和优化器
为了实现这一点,DataFusion 在很多方面支持扩展
- 从任何数据源读取(
TableProvider) - 自定义
catalogs, schemas, and table lists(CatalogProvider) - 使用 (
LogicalPlanBuilder) 构建您自己的查询语言或计划 - 声明和使用用户定义的函数(
ScalarUDF和AggregateUDF) - 添加自定义优化器重写过程(
OptimizerRule和PhysicalOptimizerRule) - 扩展规划器以使用用户定义的逻辑和物理节点(
QueryPlanner)
查询计划和执行
sql方面
Parsed with SqlToRel creates
sqlparser initial plan
┌───────────────┐ ┌─────────┐ ┌─────────────┐
│ SELECT * │ │Query { │ │Project │
│ FROM ... │──────────▶│.. │────────────▶│ TableScan │
│ │ │} │ │ ... │
└───────────────┘ └─────────┘ └─────────────┘
SQL String sqlparser LogicalPlan
AST nodes Parsed with SqlToRel creates
sqlparser initial plan
┌───────────────┐ ┌─────────┐ ┌─────────────┐
│ SELECT * │ │Query { │ │Project │
│ FROM ... │──────────▶│.. │────────────▶│ TableScan │
│ │ │} │ │ ... │
└───────────────┘ └─────────┘ └─────────────┘
SQL String sqlparser LogicalPlan
AST nodessql查询语句会使用sqloarser库(https://github.com/sqlparser-rs/sqlparser-rs)进行解析,变成AST(Abstract Syntax Tree抽象语法树)AST之后会通过datafusion::sql::planner::SqlToRel进行初始化,转变生成LogicalPlan(逻辑计划,由各个逻辑算子组成的树状结构)和logical expressions Expr(逻辑表达式),用与计算生成期望的查询结果
DataFrame方面
使用 DataFrame API 执行计划时,该过程与 SQL 相同,只是 DataFrame API 直接使用 LogicalPlanBuilder 构建 LogicalPlan, 拥有自己的自定义查询语言的系统通常也会直接构建 LogicalPlan
计划执行Planning
AnalyzerRules and PhysicalPlanner PhysicalOptimizerRules
OptimizerRules creates ExecutionPlan improve performance
rewrite plan
┌─────────────┐ ┌─────────────┐ ┌───────────────┐ ┌───────────────┐
│Project │ │Project(x, y)│ │ProjectExec │ │ProjectExec │
│ TableScan │──...──▶│ TableScan │─────▶│ ... │──...──▶│ ... │
│ ... │ │ ... │ │ ParquetExec│ │ ParquetExec│
└─────────────┘ └─────────────┘ └───────────────┘ └───────────────┘
LogicalPlan LogicalPlan ExecutionPlan ExecutionPlan AnalyzerRules and PhysicalPlanner PhysicalOptimizerRules
OptimizerRules creates ExecutionPlan improve performance
rewrite plan
┌─────────────┐ ┌─────────────┐ ┌───────────────┐ ┌───────────────┐
│Project │ │Project(x, y)│ │ProjectExec │ │ProjectExec │
│ TableScan │──...──▶│ TableScan │─────▶│ ... │──...──▶│ ... │
│ ... │ │ ... │ │ ParquetExec│ │ ParquetExec│
└─────────────┘ └─────────────┘ └───────────────┘ └───────────────┘
LogicalPlan LogicalPlan ExecutionPlan ExecutionPlan为了尽可能高效地处理具有多行的大型数据集,需要花费大量精力进行规划和优化,具体方式如下:
- 由
AnalyzerRules检查并重写LogicalPlan以强制实施检查语义规则,例如强类型检查 LogicalPlan会被OptimizerRules重写进行优化,如projection投影、filter pushdown过滤器下推等,以提高效率LogicalPlan由PhysicalPlanner转换为ExecutionPlanExecutionPlan由PhysicalOptimizerRules重写,例如sort排序和join selection,以提高效率
Data Sources数据源
Planning │
requests │ TableProvider::scan
information │ creates an
such as schema │ ExecutionPlan
│
▼
┌─────────────────────────┐ ┌──────────────┐
│ │ │ │
│impl TableProvider │────────▶│ParquetExec │
│ │ │ │
└─────────────────────────┘ └──────────────┘
TableProvider
(built in or user provided) ExecutionPlanPlanning │
requests │ TableProvider::scan
information │ creates an
such as schema │ ExecutionPlan
│
▼
┌─────────────────────────┐ ┌──────────────┐
│ │ │ │
│impl TableProvider │────────▶│ParquetExec │
│ │ │ │
└─────────────────────────┘ └──────────────┘
TableProvider
(built in or user provided) ExecutionPlanDataFusion 包含多个用于常见用例的内置数据源,并且可以通过实现 TableProvider 特征进行扩展, TableProvider 提供用于规划的信息和用于执行的 ExecutionPlans
ListingTable:从Parquet, JSON, CSV, AVRO文件读取数据,支持具有HIVE样式分区、可选压缩、直接从远程对象存储读取等的单个文件或多个文件MemTable:从内存中的RecordBatches读取数据StreamingTable:从潜在的无限输入中读取数据
执行
ExecutionPlan::execute Calling next() on the
produces a stream stream produces the data
┌───────────────┐ ┌─────────────────────────┐ ┌────────────┐
│ProjectExec │ │impl │ ┌───▶│RecordBatch │
│ ... │─────▶│SendableRecordBatchStream│────┤ └────────────┘
│ ParquetExec│ │ │ │ ┌────────────┐
└───────────────┘ └─────────────────────────┘ ├───▶│RecordBatch │
▲ │ └────────────┘
ExecutionPlan │ │ ...
│ │
│ │ ┌────────────┐
PhysicalOptimizerRules ├───▶│RecordBatch │
request information │ └────────────┘
such as partitioning │ ┌ ─ ─ ─ ─ ─ ─
└───▶ None │
└ ─ ─ ─ ─ ─ ─ ExecutionPlan::execute Calling next() on the
produces a stream stream produces the data
┌───────────────┐ ┌─────────────────────────┐ ┌────────────┐
│ProjectExec │ │impl │ ┌───▶│RecordBatch │
│ ... │─────▶│SendableRecordBatchStream│────┤ └────────────┘
│ ParquetExec│ │ │ │ ┌────────────┐
└───────────────┘ └─────────────────────────┘ ├───▶│RecordBatch │
▲ │ └────────────┘
ExecutionPlan │ │ ...
│ │
│ │ ┌────────────┐
PhysicalOptimizerRules ├───▶│RecordBatch │
request information │ └────────────┘
such as partitioning │ ┌ ─ ─ ─ ─ ─ ─
└───▶ None │
└ ─ ─ ─ ─ ─ ─ExecutionPlans 使用 Apache Arrow 内存格式处理数据,大量使用 arrow crate 中的函数。 调用execute会产生1个或多个数据分区,其中包含一个实现SendableRecordBatchStream的运算符。 值用 ColumnarValue 表示,可以是 ScalarValue(单个常量值)或 ArrayRef(箭头数组)。 平衡并行性是使用 RepartitionExec 实现的,它实现了 Volcano 风格的“交换”