简介

官方文档。Hiddify app 是一款基于 Sing-box 通用代理工具的跨平台代理客户端。Hiddify 提供了较全面的代理功能,例如自动选择节点、TUN 模式、使用远程配置文件等。Hiddify 无广告,并且代码开源。它为大家自由访问互联网提供了一个支持多种协议的、安全且私密的工具。

官网

Apache Arrow

Arrow

Arrow C++ 库由多个部分组成,每个部分完成不同的功能。

  • 物理层
  • 一维层
  • 二维层
  • 计算层
  • IO 层
  • IPC 层(Inter-Process Communication)
  • 格式层
  • 设备层
  • 文件系统层

物理层(The physical layer)

  • Memory management 抽象提供了统一的 API 来管理内存

  • Buffer 抽象表示物理数据的连续区域。

一维层(The one-dimensional layer)

  • Data Types 赋予物理数据逻辑含义
  • Arrays 装了一个或多个有类型的缓冲区,允许将他们视为逻辑连续的值序列(可能是嵌套的)
  • Chunked arrays (分块数组)是对数组的泛化,将多个相同类型的数组组合成一个更长的逻辑值序列

二维层(The two-dimensional layer)

  • Schemas 描述了多个数据的逻辑集合,每个数据都有不同的名称和类型,以及可选的元数据
  • Tables 是根据 Schemas 的分块数组的集合。它们是 Arrow 中最有能力提供数据集的抽象
  • Record batches 是由 Schemas 描述的连续数组的集合,允许表的增量构造或序列化

计算层(The compute layer)

  • Datums 是灵活的数据集一样,比如可以保存一个 Array 或 Table 引用
  • Kernels 是在一组给定的 Datums 上循环运行的专用计算函数,这些数据表示函数的输入和输出参数
  • Acero 是一个流执行引擎,允许将计算表示为可以转换数据流的运算符图

IO 层(The IO layer)

  • Streams 允许对各种外部数据(例如压缩或内存映射)进行无类型的顺序或可查找访问

IPC 层(The Inter-Process Communication (IPC) layer)

  • Messaging Format 允许 Arrow 数据在进程间交换,并尽可能少得拷贝

文件格式层(The file formats layer)

读写尽可能多的文件格式,比如 Parquet、CSV、Orc,以及 Arrow 专用的 Feather 格式。

设备层(The devices layer)

  • CUDA 提供基础集成,允许描述由 GPU 分配的内存数据

文件系统层(The filesystem layer)

文件系统抽象允许从不同的存储后端读取和写入数据,例如本地文件系统或 S3 存储桶

Acero

Acero 是一个流查询引擎,用来制定和执行计算。

Acero consumes streams of input data, transforms them, and outputs a stream of data.

Arrow 使用 ExecPlan 来表示计算,ExecPlan 以零个或多个 Stream 作为输入,但输出只有一个 Stream。Plan 描述了数据怎么被转换、传递。

概念

ExecNode

ExecNode 是 Acero 中最基础的概念,ExecNode 有零个或多个输入,零或一个输出。如果没有 Source,称其为源;如果没有输出,称其为 Sink。ExecNode 还有多种类型。

  • Sources Nodes
  • Compute Nodes
  • Arrangement NOdes
  • Sink Nodes

ExecBatch

批量数据(Batches of Data)使用 ExecBatch 类来描述。ExecBatch 是一个二维结构,和 RecordBatch 很相似。可以有零个或多个列,每个列必须有相同的长度。

1
2
exec_batch.length :        数据行度 (row)
exec_batch.values.size() : 列数量 (column)

rb_vs_eb.svg

RecordBatch 和 ExecBatch 对 Arrays 和 Buffers 都有强所有权。把 RecordBatch 转换为 ExecBatch 总是零拷贝的。但是 ExecBatch 转换为 RecordBatch 时只有在无标量(Scalar) 时才是零拷贝。

ExecPlan

ExecPlan 表示由 ExecNode 组成的图。一个合法的 ExecPlan 必须由一个 Source 节点,但技术上不需要有 Sink 节点。ExecPlan 包含所有节点共享的资源,以及工具方法来启动、停止节点的执行。ExecPlan 和 ExecNode 都与单个执行的生命周期相关联。它们有状态,预计不可重新启动。

Declaration

Declaration 是 ExecNode 的蓝图。声明可以组合成一个图表,形成 ExecPlan 的蓝图。

decl_vs_ep.svg

使用

Acero 的基础工作流如下。

  1. 首先,创建 Declaration 图,描述 Plan
  2. 调用一个 DeclarationToXyz 方法来执行 Declaration
    1. 创建 ExecPlan。一个新的 ExecPlan 会从 Declaration Graph 中被创建。每个 Declaration 对应 Plan 中的一个 ExecNode。此外,会根据具体的 DeclarationToXyz 创建 Sink Node
    2. 执行 ExecPlan。通常,这是 DeclarationToXyz 调用的一部分,但在 DeclarationToReader 中,阅读器会在计划完成执行之前返回。
    3. 销毁。Plan 一但结束就会被销毁。

创建 Plan

使用 Substrait

Substrait 是创建 Plan (Graph of Declaration)的推荐机制,原因如下。

  • Substrait 耗费了大量时间和精力创建用户友好的 API,以简单的方式创建负责的 Execution Plans
  • 更容易迁移到其他类似引擎
  • 更多基于 Substrait 的工具

程序化构建 Plan

以编程方式创建执行计划比从 Substra 创建计划更简单。Declaration::Sequence() 可以更方便地创建处理序列。

执行 plan

  • DeclarationToTable,累计所有的结果到一个 arrow::Table
  • DeclarationToReader,允许以迭代的方式使用所有结果,会创建一个 arrow::RecordBatchReader
  • DeclarationToStatus,如果只想执行 Plan 而不消费结果时很有用
  • 直接运行 Plan
    • 创建 ExecPlan
    • 添加 Sink Node 到 Declaration Graph 中
    • 使用 Declaration::AddToPlan 添加 Declaration 到 Plan 中
    • 验证 Plan,ExecPlan::Validate
    • 开始 Plan,EexcPlan::StartProducing
    • 等待 Future,ExecPlan::finished

可用的 ExecNode

Sources Node

用作数据源。

  • source,SourceNodeOptions,通用源节点,包装异步流数据
  • table_source,TableSourceNodeOptions
  • record_batch_source,RecordBatchSourceNodeOptions
  • record_batch_reader_source,RecordBatchReaderSourceNodeOptions
  • exec_batch_source,ExecBatchSourceNodeOptions
  • array_vector_source,ArrayVectorSourceNodeOptions
  • scan,arrow::dataset::ScanNodeOptions

Computer Nodes

对数据进行计算、转换、重塑。

  • filter,FilterNodeOptions,移除不符合过滤表达式的行
  • project,ProjectNodeOptions,通过执行计算表达式创建新的列,也可以移除、排序列
  • aggregate,AggregateNodeOptions,计算整个输入流或数据组的汇总统计信息
  • pivot_longer,PivotLongerNodeOptions,通过将某些列转换为附加行来重塑数据

Arrangement Nodes

重新排序、组合或切片数据流。

  • hash_john,HashJoinNodeOptions,基于相同的列合并两个输入
  • asofjoin,AsofJoinNodeOptions,工具相同的有序列把多个输入合并到第一个输入
  • union,合并两个具有相同 Schema 的输入
  • order_by,OrderByNodeOptions,对流排序
  • fetch,FetchNodeOptions,从流中切片出一系列行

Sink Nodes

Sink 节点会结束一个 Plan。通常不需要创建 Sink Node,会基于 DeclarationToXyz 方法选择。

Conceptions

  • Core
    • Buffer
    • Memory Pool
    • Array 数组
    • Scalar 标量,表示有特定类型的单值
    • Record Batch
    • Table
    • Chunked Array
    • Table
  • Compute
    • Function
    • Function Registry,注册自定义处理函数
    • Datum
    • Expression
    • ExecBatch
    • BufferSpan
    • ArraySpan
    • ExecSpan
  • Acero
    • Declaration
    • ExecPlan
    • ExecNode
  • Other

PushGenerator

JoinType

KernelState

KernelContext

KernelInitArgs

ScalarKernel

使用

自定义 Function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// Function 帮助类
struct FunctionDoc {
std::string summary;
std::string description;
std::vector<std::string> arg_names;
std::string options_class; // 参数类
bool options_required; // 是否需要参数(Options)
}

// 参数数量标记类
struct Arity {
Arity Nullary(); // 无参数
Arity Unary(); // 一个参数
Arity Binary(); // 两个参数
Arity Ternary(); // 三个参数
Arity VarArgs(); // 动态参数
}

// 函数类型
enum Kind {
SCALAR,
VECTOR,
SCALAR_AGGREGATE,
HASH_AGGREGATE,
META
}

// 函数基类
Function(std::string name, Function::Kind kind, const Arity& arity, FunctionDoc doc,
const FunctionOptions* default_options);
// ScalarFunction 实现了 FunctionImpl<ScalarKernel>
ScalarFunction(std::string name, const Arity& arity, FunctionDoc doc,
const FunctionOptions* default_options = NULLPTR, bool is_pure = true);

// Kernel
using ArrayKernelExec = Status (*)(KernelContext*, const ExecSpan&, ExecResult*);

TODO

  • 需要对 Kernel 做详细了解

附录

PyTorch - Startup

nn

Module

所有神经网络模块的基类,自定义的模型也应把该类作为基类。

1
forward(*input) : 定义每次调用时的计算逻辑,所有子类都应该重写

Sequential

提供了一种简单的方式来按顺序堆叠神经网络的层,用于快速构建一个顺序的神经网络模型。在模型进行前向传播时,nn.Sequential会按照层的顺序依次调用每个层的forward方法,将前一层的输出作为下一层的输入,直到最后一层输出结果。

Linear

ReLU

DDP

  • rank,全局进程序号
  • local_rank,本机进程序号,可以使用 local_rank = torch.distributed.get_rank() % torch.cuda.device_count() 计算(在每个机器进程数一致时)
  • world size,全局并行数

附录

MetaSpore - Startup

C++

common

特征计算相关

FeatureComputeContext 和 FeatureComputeExecContext 都是在 cpp 文件中进行定义。

FeatureComputeContext

FeatureComputeExecContext

FeatureComputeExec

metaspore

Python

embedding

EmbeddingBagModule

1
EmbeddingBagModule -> torch.nn.Module

EmbeddingSumConcat

1
EmbeddingSumConcat -> EmbeddingOperator -> torch.nn.Module

nn

Normalization

others

FTRLTensorUpdater

NormalTensorInitializer

附录

boost

MetaSpore - Startup

打包

  1. 先编译 C++ 库,生成 metaspore.so
  2. 再使用 setuptools 和 wheel 工具,打包 python 库

MetaSpore C++

MetaSpore C++ 包含几个模块。

  • common
  • serving
  • metaspore (shared)

common

  • globals

    • 定义 gflags 变量
  • hashmap

  • arrow

  • features

metaspore

  1. 提供离线训练、在线 serving 的共用代码
  2. 离线使用
    1. 使用 pybind11 库定义并绑定 C++ 代码接口
    2. python 代码加载共享库,像调用 python 代码一样调用使用 pybind11 定义的 C++ 接口

Getting Started

文档链接

定义模型

1
2
3
4
5
6
7
8
embedding_size      : 每个特征组的 embedding size
# MetaSpore 相关
sparse : ms.EmbeddingSumConcat
sparse.updater : ms.FTRLTensorUpdater
sparse.initializer : ms.NormalTensorInitializer
dense.normalization : ms.nn.Normalization
# Torch 相关
dense : torch.nn.Sequential

初始化内容。

  • EmbeddingSumConcat
    • SparseFeatureExtractor
      • 解析原始特征列配置文件
      • 向计算图中添加计算 Hash 特征的 Node
    • EmbeddingBagModule
  • TensorUpdater,Sparse & Dense 数据更新类
    • FTRLTensorUpdater
  • TensorInitializer,张量初始化器
    • NormalTensorInitializer,归一化张量初始化器
  • Normalization,归一化

训练模型

1
PyTorchEstimator
  • 定义 PyTorchEstimator
    • module
    • worker / server 数量
    • 模型输出路径
    • Label 列索引
1
2
3
4
5
PyTorchAgent
PyTorchLauncher
PyTorchHelperMixin
PyTorchModel
PyTorchEstimator

核心概念

  • JobRunner
  • PyTorchEstimator
    • pyspark.ml.base.Estimator
  • Launcher
    • PSLauncher
  • Agent
  • Module
    • EmbeddingOperator
    • TensorUpdater
    • TensorInitializer
    • Normalization
  • PyTorchModel
    • pyspark.ml.base.Model
  • Metric

Python 机器学习 基于 Pytorch 和 Scikit-Learn - 第二章

神经网络与感知机学习规则

基于神经元模型,提出了感知机学习规则。感知机规则提出了一个可以自动学习的权重优化算法。

感知机算法步骤如下。

  1. 初始化权重和偏置项为 0 或很小的随机数
  2. 遍历每个训练样本
    1. 计算感知机输出值
    2. 更新权重和偏置项

需要注意的是。

  1. 只有当训练数据线性可分时,才能保证感知机具有收敛性
    1. 此时需要设置训练数据集的最大循环次数,或容错次数的阈值,来结束训练
  2. 权重、偏置项使用很小的初始化值替代 0 ,如果全是 0 则学习率会失去对决策边界的影响
    1. 学习率只影响权重向量的大小,不影响其方向

CUDA - Coding

错误处理

运行时 API 错误码

调用 CUDA 运行时 API 时,接口返回错误码。

1
__host__ __device__ cudaError_t cudaGetDeviceCount ( int* count ); // 获取设备数量, 返回错误码

错误检查

1
2
__host__ __device__ const char* cudaGetErrorName ( cudaError_t error );     // 获取错误码的枚举名称
__host__ __device__ const char* cudaGetErrorString ( cudaError_t error ); // 获取错误码的解释描述

定义错误检查函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
__host__ void error_check_entry() {
int device_id_in_use;
error_check(cudaGetDevice(&device_id_in_use), __FILE__, __LINE__);
error_check(cudaSetDevice(999), __FILE__, __LINE__);
// char *p_c;
// error_check(cudaMalloc(&p_c, 100), __FILE__, __LINE__);

cudaDeviceSynchronize();
} /** output
error_check, ok
CUDA error:
code=101, name=cudaErrorInvalidDevice, description=invalid device ordinal,
file=/data/code/cook-cuda/src/sample/hello_world.cu, line=51
*/

核函数中的异常

核函数的返回值必须是 void。

1
__host__ __device__ cudaError_t cudaGetLastError ( void ); // 返回最后一次错误码
1
2
3
4
5
6
7
8
9
__global__ void kernel_error_entry() {
dim3 block(2048);
print_build_in_vars<<<2, block>>>(); // block size 最大 1024
error_check(cudaGetLastError(), __FILE__, __LINE__);
} /** output
CUDA error:
code=9, name=cudaErrorInvalidConfiguration, description=invalid configuration argument,
file=/data/code/cook-cuda/src/sample/hello_world.cu, line=67
*/

性能评估

事件计时

1
2
3
4
5
__host__ cudaError_t cudaEventCreate ( cudaEvent_t* event );
__host__ __device__ cudaError_t cudaEventRecord ( cudaEvent_t event, cudaStream_t stream = 0 );
__host__ cudaError_t cudaEventSynchronize ( cudaEvent_t event );
__host__ cudaError_t cudaEventElapsedTime ( float* ms, cudaEvent_t start, cudaEvent_t end );
__host__ __device__ cudaError_t cudaEventDestroy ( cudaEvent_t event );

示例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
cudaEvent_t start, end;
error_check(cudaEventCreate(&start), __FILE__, __LINE__);
error_check(cudaEventCreate(&end), __FILE__, __LINE__);
error_check(cudaEventRecord(start), __FILE__, __LINE__);
cudaEventQuery(start);

// run GPU Task

error_check(cudaEventRecord(end), __FILE__, __LINE__);
error_check(cudaEventSynchronize(end), __FILE__, __LINE__);
float elapsed_time_ms;
ERROR_CHECK(cudaEventElapsedTime(&elapsed_time_ms, start, end));

printf("elapsed time: %f ms\n", elapsed_time_ms);
ERROR_CHECK(cudaEventDestroy(start));
ERROR_CHECK(cudaEventDestroy(end));

error_check。

1
2
3
4
5
6
7
8
__host__ __device__ cudaError_t error_check(cudaError_t err, const char *fn, int line) {
if (err != cudaSuccess) {
printf("CUDA error:\n\tcode=%d, name=%s, description=%s, \n\tfile=%s, line=%d\n", err, cudaGetErrorName(err),
cudaGetErrorString(err), fn, line);
}
return err;
}
#define ERROR_CHECK(exp) error_check(exp, __FILE__, __LINE__)

nvprof

nvprof 是评估 cuda 程序性能的工具。不过目前已经是过时的工具,不适用 compute capability >= 8.0 的设备。新设备适用 nsys 替代。

1
$ nvprof {cuda-program}

nsys

1
2
$ nsys profile {cuda-program} # 运行并记录程序的 profile 到 nsys-rep 文件
$ nsys analyze {nsys-rep} # 分析 profile 文件

获取 GPU 信息

运行时 API

1
__host__ cudaError_t cudaGetDeviceProperties ( cudaDeviceProp* prop, int  device )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
__host__ void PrintDeviceInfo() {
int deviceCount;
cudaGetDeviceCount(&deviceCount);
std::cout << "GPU device count: " << deviceCount << std::endl;

for (int i = 0; i < deviceCount; ++i) {
// sm: 流式多处理器, Streaming Multiprocessor
cudaDeviceProp dp{};
cudaGetDeviceProperties(&dp, i);
std::cout << "device.0 " << std::endl;
std::cout << " sm count: \t\t\t\t" << dp.multiProcessorCount << std::endl;
std::cout << " shared memory per block: \t\t" << dp.sharedMemPerBlock / 1024 << "KB" << std::endl;
std::cout << " max threads per block:\t\t" << dp.maxThreadsPerBlock << std::endl;
std::cout << " max threads per multi processor:\t" << dp.maxThreadsPerMultiProcessor << std::endl;
std::cout << " max threads per sm:\t\t\t" << dp.maxThreadsPerMultiProcessor / 32 << std::endl;
std::cout << " max blocks per multi processor:\t" << dp.maxBlocksPerMultiProcessor << std::endl;
}
}

helm - usage

local git repo

目录结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ tree
.
├── ace
│   ├── nginx
│   │   ├── Chart.yaml
│   │   ├── configmap
│   │   │   └── sources.list
│   │   ├── templates
│   │   │   ├── configmap.yaml
│   │   │   ├── deployment.yaml
│   │   │   ├── _helpers.tpl
│   │   │   └── service.yaml
│   │   └── values.yaml
│   └── ...
├── Makefile
└── README.md

安装 Chart

1
2
3
4
5
6
7
$ helm install ace ace/nginx
NAME: ace
LAST DEPLOYED: Mon Aug 5 13:42:17 2024
NAMESPACE: default
STATUS: deployed
REVISION: 1
TEST SUITE: None

更新 Chart

1
2
3
4
5
6
7
8
$ helm upgrade ace ace/nginx
Release "ace" has been upgraded. Happy Helming!
NAME: ace
LAST DEPLOYED: Mon Aug 5 13:47:34 2024
NAMESPACE: default
STATUS: deployed
REVISION: 2
TEST SUITE: None

卸载 Chart

1
2
$ helm uninstall ace
release "ace" uninstalled

示例

1
2
3
4
5
6
$ helm upgrade --install {release-name} {chart-path} \
--create-namespace -n ${kubeNamespace}
-f {path-to-helm-values-file} \
--set app.tag={tag-value} \
--kube-context {kube-context} \
--kubeconfig {path-to-kube-config-file}

解释

  • --install :更新 release,如果不存在则安装
  • --create-namespace:如果 namespace 不存在,则创建,和 --install 配合使用
  • --set app.tag :指定 Value app.tag 的值

问题排查 - CPU

原因分析

计算任务

  • 计算量过大的任务占用过多 CPU
  • 死循环

上下文切换

  • 死锁
  • 频繁加锁
  • 过多的并发
  • 内存不足
  • 频繁 GC(Java、GO 等语言)

问题排查

借助 TOP 命令

1
top -Hp {pid}  # 查看指定进程内各线程占用 CPU 的情况

查看线程数量

1
ps -p {pid} -L | wc -l

排查进程的上下文切换情况

pidstat

1
pidstat -w -p {pid}

其中,<PID> 是目标进程的进程 ID。上述命令将显示指定进程的 CPU 上下文切换统计信息,包括自愿切换(voluntary switches)和非自愿切换(non-voluntary switches)。

1
2
3
4
Linux 4.14.301-224.520.amzn2.x86_64 (...) 	2024年07月04日 	_x86_64_	(32 CPU)

10时23分19秒 UID PID cswch/s nvcswch/s Command
10时23分19秒 0 3637168 0.17 0.00 ...
1
2
# 安装
yum install sysstat -y

perf

1
2
3
4
5
6
7
8
9
10
11
12
perf stat -e cs,<event> -p <PID>
# event: cs (所有模式切换) , cs:u (用户模式切换), cs:k (内核模式切换)
$ perf stat -e cs,cs:u,cs:k -p 3637168 # Ctrl-C 结束收集

^C
Performance counter stats for process id '3637168':

44,981 cs
0 cs:u
44,981 cs:k

27.447834538 seconds time elapsed

perf stat 和 perf record 区别

  • perf stat

  • 快速查看程序基本性能指标

  • 采集 CPU 指令、缓存命中率、上下文切换等

  • perf record

  • 可采集系统或特定进程的性能事件

  • 采集指令、缓存、分支等事件

  • 可导出文件,用于后续的分析

1
yum install perf -y

其他问题

  • 如何区分是计算任务占用 CPU 还是过多上下文切换占用任务
  • 区分 IO 线程和 Work 线程的必要性