Apache Arrow

Arrow

Arrow C++ 库由多个部分组成,每个部分完成不同的功能。

  • 物理层
  • 一维层
  • 二维层
  • 计算层
  • IO 层
  • IPC 层(Inter-Process Communication)
  • 格式层
  • 设备层
  • 文件系统层

物理层(The physical layer)

  • Memory management 抽象提供了统一的 API 来管理内存

  • Buffer 抽象表示物理数据的连续区域。

一维层(The one-dimensional layer)

  • Data Types 赋予物理数据逻辑含义
  • Arrays 装了一个或多个有类型的缓冲区,允许将他们视为逻辑连续的值序列(可能是嵌套的)
  • Chunked arrays (分块数组)是对数组的泛化,将多个相同类型的数组组合成一个更长的逻辑值序列

二维层(The two-dimensional layer)

  • Schemas 描述了多个数据的逻辑集合,每个数据都有不同的名称和类型,以及可选的元数据
  • Tables 是根据 Schemas 的分块数组的集合。它们是 Arrow 中最有能力提供数据集的抽象
  • Record batches 是由 Schemas 描述的连续数组的集合,允许表的增量构造或序列化

计算层(The compute layer)

  • Datums 是灵活的数据集一样,比如可以保存一个 Array 或 Table 引用
  • Kernels 是在一组给定的 Datums 上循环运行的专用计算函数,这些数据表示函数的输入和输出参数
  • Acero 是一个流执行引擎,允许将计算表示为可以转换数据流的运算符图

IO 层(The IO layer)

  • Streams 允许对各种外部数据(例如压缩或内存映射)进行无类型的顺序或可查找访问

IPC 层(The Inter-Process Communication (IPC) layer)

  • Messaging Format 允许 Arrow 数据在进程间交换,并尽可能少得拷贝

文件格式层(The file formats layer)

读写尽可能多的文件格式,比如 Parquet、CSV、Orc,以及 Arrow 专用的 Feather 格式。

设备层(The devices layer)

  • CUDA 提供基础集成,允许描述由 GPU 分配的内存数据

文件系统层(The filesystem layer)

文件系统抽象允许从不同的存储后端读取和写入数据,例如本地文件系统或 S3 存储桶

Acero

Acero 是一个流查询引擎,用来制定和执行计算。

Acero consumes streams of input data, transforms them, and outputs a stream of data.

Arrow 使用 ExecPlan 来表示计算,ExecPlan 以零个或多个 Stream 作为输入,但输出只有一个 Stream。Plan 描述了数据怎么被转换、传递。

概念

ExecNode

ExecNode 是 Acero 中最基础的概念,ExecNode 有零个或多个输入,零或一个输出。如果没有 Source,称其为源;如果没有输出,称其为 Sink。ExecNode 还有多种类型。

  • Sources Nodes
  • Compute Nodes
  • Arrangement NOdes
  • Sink Nodes

ExecBatch

批量数据(Batches of Data)使用 ExecBatch 类来描述。ExecBatch 是一个二维结构,和 RecordBatch 很相似。可以有零个或多个列,每个列必须有相同的长度。

1
2
exec_batch.length :        数据行度 (row)
exec_batch.values.size() : 列数量 (column)

rb_vs_eb.svg

RecordBatch 和 ExecBatch 对 Arrays 和 Buffers 都有强所有权。把 RecordBatch 转换为 ExecBatch 总是零拷贝的。但是 ExecBatch 转换为 RecordBatch 时只有在无标量(Scalar) 时才是零拷贝。

ExecPlan

ExecPlan 表示由 ExecNode 组成的图。一个合法的 ExecPlan 必须由一个 Source 节点,但技术上不需要有 Sink 节点。ExecPlan 包含所有节点共享的资源,以及工具方法来启动、停止节点的执行。ExecPlan 和 ExecNode 都与单个执行的生命周期相关联。它们有状态,预计不可重新启动。

Declaration

Declaration 是 ExecNode 的蓝图。声明可以组合成一个图表,形成 ExecPlan 的蓝图。

decl_vs_ep.svg

使用

Acero 的基础工作流如下。

  1. 首先,创建 Declaration 图,描述 Plan
  2. 调用一个 DeclarationToXyz 方法来执行 Declaration
    1. 创建 ExecPlan。一个新的 ExecPlan 会从 Declaration Graph 中被创建。每个 Declaration 对应 Plan 中的一个 ExecNode。此外,会根据具体的 DeclarationToXyz 创建 Sink Node
    2. 执行 ExecPlan。通常,这是 DeclarationToXyz 调用的一部分,但在 DeclarationToReader 中,阅读器会在计划完成执行之前返回。
    3. 销毁。Plan 一但结束就会被销毁。

创建 Plan

使用 Substrait

Substrait 是创建 Plan (Graph of Declaration)的推荐机制,原因如下。

  • Substrait 耗费了大量时间和精力创建用户友好的 API,以简单的方式创建负责的 Execution Plans
  • 更容易迁移到其他类似引擎
  • 更多基于 Substrait 的工具

程序化构建 Plan

以编程方式创建执行计划比从 Substra 创建计划更简单。Declaration::Sequence() 可以更方便地创建处理序列。

执行 plan

  • DeclarationToTable,累计所有的结果到一个 arrow::Table
  • DeclarationToReader,允许以迭代的方式使用所有结果,会创建一个 arrow::RecordBatchReader
  • DeclarationToStatus,如果只想执行 Plan 而不消费结果时很有用
  • 直接运行 Plan
    • 创建 ExecPlan
    • 添加 Sink Node 到 Declaration Graph 中
    • 使用 Declaration::AddToPlan 添加 Declaration 到 Plan 中
    • 验证 Plan,ExecPlan::Validate
    • 开始 Plan,EexcPlan::StartProducing
    • 等待 Future,ExecPlan::finished

可用的 ExecNode

Sources Node

用作数据源。

  • source,SourceNodeOptions,通用源节点,包装异步流数据
  • table_source,TableSourceNodeOptions
  • record_batch_source,RecordBatchSourceNodeOptions
  • record_batch_reader_source,RecordBatchReaderSourceNodeOptions
  • exec_batch_source,ExecBatchSourceNodeOptions
  • array_vector_source,ArrayVectorSourceNodeOptions
  • scan,arrow::dataset::ScanNodeOptions

Computer Nodes

对数据进行计算、转换、重塑。

  • filter,FilterNodeOptions,移除不符合过滤表达式的行
  • project,ProjectNodeOptions,通过执行计算表达式创建新的列,也可以移除、排序列
  • aggregate,AggregateNodeOptions,计算整个输入流或数据组的汇总统计信息
  • pivot_longer,PivotLongerNodeOptions,通过将某些列转换为附加行来重塑数据

Arrangement Nodes

重新排序、组合或切片数据流。

  • hash_john,HashJoinNodeOptions,基于相同的列合并两个输入
  • asofjoin,AsofJoinNodeOptions,工具相同的有序列把多个输入合并到第一个输入
  • union,合并两个具有相同 Schema 的输入
  • order_by,OrderByNodeOptions,对流排序
  • fetch,FetchNodeOptions,从流中切片出一系列行

Sink Nodes

Sink 节点会结束一个 Plan。通常不需要创建 Sink Node,会基于 DeclarationToXyz 方法选择。

Conceptions

  • Core
    • Buffer
    • Memory Pool
    • Array 数组
    • Scalar 标量,表示有特定类型的单值
    • Record Batch
    • Table
    • Chunked Array
    • Table
  • Compute
    • Function
    • Function Registry,注册自定义处理函数
    • Datum
    • Expression
    • ExecBatch
    • BufferSpan
    • ArraySpan
    • ExecSpan
  • Acero
    • Declaration
    • ExecPlan
    • ExecNode
  • Other

PushGenerator

JoinType

KernelState

KernelContext

KernelInitArgs

ScalarKernel

使用

自定义 Function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Function 帮助类
struct FunctionDoc {
std::string summary;
std::string description;
std::vector<std::string> arg_names;
std::string options_class; // 参数类
bool options_required; // 是否需要参数(Options)
}

// 参数数量标记类
struct Arity {
Arity Nullary(); // 无参数
Arity Unary(); // 一个参数
Arity Binary(); // 两个参数
Arity Ternary(); // 三个参数
Arity VarArgs(); // 动态参数
}

// 函数类型
enum Kind {
SCALAR,
VECTOR,
SCALAR_AGGREGATE,
HASH_AGGREGATE,
META
}

// 函数基类
Function(std::string name, Function::Kind kind, const Arity& arity, FunctionDoc doc,
const FunctionOptions* default_options);
// ScalarFunction 实现了 FunctionImpl<ScalarKernel>
ScalarFunction(std::string name, const Arity& arity, FunctionDoc doc,
const FunctionOptions* default_options = NULLPTR, bool is_pure = true);

// Kernel
using ArrayKernelExec = Status (*)(KernelContext*, const ExecSpan&, ExecResult*);

TODO

  • 需要对 Kernel 做详细了解

附录