gperftools

安装

1
$ yum install gperftools gperftools-devel -y

Heap Profile

1
2
3
4
5
6
7
8
# dump
$ env LD_PRELOAD="/usr/lib64/libtcmalloc_and_profiler.so" HEAPPROFILE="heap-profile" <binary>
$ env LD_PRELOAD="/usr/lib64/libtcmalloc.so" HEAPPROFILE="heap-profile" <binary>
$ env LD_PRELOAD="/usr/lib64/libtcmalloc.so" HEAPPROFILE="/tmp/profile" HEAP_PROFILE_TIME_INTERVAL=60 <binary>

# analysis
$ pprof --text <binary> /tmp/profile....heap
$ pprof --svg <binary> /tmp/profile....heap > heap.svg

k8s usage

[toc]

节点管理

删除节点

1
$ sudo kubeadm reset cleanup-node  # master 节点也可以清除

标签

1
2
3
4
5
# 查看 node 标签
$ kubectl get nodes --show-labels

# 打标签
$ kubectl label nodes <your-node-name> disktype=ssd

设置 pod 的 node 标签选择(nodeSelector)

1
2
3
4
5
6
7
8
9
10
11
12
13
apiVersion: v1
kind: Pod
metadata:
name: nginx
labels:
env: test
spec:
containers:
- name: nginx
image: nginx
imagePullPolicy: IfNotPresent
nodeSelector:
disktype: ssd

设置 deployment node 标签选择

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
spec:
template:
# 按标签选择机器
nodeSelector:
<label-name>: <label-value>
# 设置机器亲和性
affinity:
# 设置 node 亲和性
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: <label-name>
operator: In
values:
- <label-value>
# 设置 pod 亲和性
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- <pod-name>
- <pod-name>
topologyKey: kubernetes.io/hostname

Deployment 管理

1
kubectl delete --all deployments d-n=<ns>

Pod 管理

1
2
3
4
5
6
7
8
9
10
# 查看 pod  信息
$ kubectl get pods -A # A = all-namespaces

# 删除 pod
$ kubectl delete pod <name> --namespace=<ns>
# 批量删除
kubectl delete pod --all -n=<ns>

# 查看 pod 详情; ip 等
$ kubectl describe pod <name> --namespace=<ns>

Service

1
2
3
4
5
# 列出所有 service
k get services -o wide -A --sort-by=.metadata.name

# 获取 service 详情; nodeport 等
k describe service kubernetes-dashboard -n kube-system

Token

1
microk8s kubectl create token -n kube-system default --duration=8544h

私有仓库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# docker login
docker login hub.company.com

# 不指定 namespace, 设置 default
kubectl create secret generic regcred \
--from-file=.dockerconfigjson=~/.docker/config.json \
--type=kubernetes.io/dockerconfigjson

# 每个 namespace 需要单独设置
kubectl create secret generic regcred \
--from-file=.dockerconfigjson=~/.docker/config.json \
--type=kubernetes.io/dockerconfigjson \
--namespace=ace

# 修改 deployment
apiVersion: v1
kind: Pod
metadata:
name: private-reg
spec:
containers:
- name: private-reg-container
image: <your-private-image>
# 添加 imagePullSecrets
imagePullSecrets:
- name: regcred

Secret

1
2
3
4
5
6
7
8
9
10
11
12
# 创建
kubectl create secret generic regcred \
--from-file=.dockerconfigjson=$HOME/.docker/config.json \
--type=kubernetes.io/dockerconfigjson \
--namespace=default

# 查看
kubectl get secrets
kubectl get secretss <secret> --output=yaml

# 删除
kubectl delete secret <secret>

Namespace

1
2
3
4
5
# 列出所有的 namespace
$ kubectl get namespaces

# 创建命名空间
$ kubectl create namespace <space-name>

登录容器

1
kubectl exec --stdin --tty <pod-instance> -- /bin/bash

matplotlib

参数

alpha

  • 透明度
  • float [0, 1]

bins

  • 分段区间
  • 可以是一个整数或一个序列
  • 如果是整数n,将会均匀分为n个区间,否则按指定的序列进行分段

配置

subplot

绘制子图。

原型

1
subplot(nrows, ncols, index, **kwargs)

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import matplotlib.pyplot as plt

y = [i for i in range(100)]
z = [i * 1.1 for i in range(100)]

plt.subplot(2, 2, 1)
plt.xlabel('x label')
plt.ylabel('y label')
plt.plot(y, label='y')
plt.plot(z, label='z')
plt.legend()

#### no legend
plt.subplot(2, 2, 2)
plt.xlabel('x label')
plt.ylabel('y label')
plt.plot(y, label='y')
plt.plot(z, label='z')

plt.subplot(2, 2, 4)
plt.xlabel('x label')
plt.ylabel('y label')
plt.plot(y, label='y')
plt.plot(z, label='z')
plt.legend()

# no NO.3 subplot
plt.show()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 数据来自鸢尾花数据集,绘制散点图
columns = ['Id', 'SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm', 'Label']
for i in range(2, 5):
plt.subplot(1, 3, i - 1)
plt.scatter(features[:, 1], features[:, i], c=labels, s=50, cmap='viridis')
plt.title(columns[i])
plt.show()

# 自定义 subplot size
start = 2
end = 5
fg, axs = plt.subplots(end - start, end - start, figsize=(25, 25))
for i in range(start, end):
for j in range(start, end):
axs[i - start, j - start].scatter(features[:, 1], features[:, i], c=labels, s=25, cmap='viridis')
axs[i - start, j - start].set_title(columns[i] + ' - ' + columns[j])

matplotlib usage

简介

Matplotlib

  • 数据可视化

安装

1
2
3
sudo pip3 install matplotlib
# or
sudo apt install python3-matplotlib

示例程序

sample 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import numpy as np
import matplotlib.pyplot as plt


x = np.linspace(0, 10, 1000) # 作图的变量自变量
y = np.sin(x) + 1 # 因变量y
z = np.cos(x**2) + 1 # 因变量z

plt.figure(figsize=(8, 4)) # 设置图像大小
plt.plot(x, y, label='$\sin x+1$', color='red', linewidth=2) # 作图, 设置标签, 线条颜色, 线条大小
plt.plot(x, z, 'b--', label='$\cos x^2+1$')
plt.xlabel('Time(s) ') # x轴名称
plt.ylabel('Volt') # y轴名称
plt.ylim(0, 2.2) # 显示的y轴范围
plt.legend() # 显示图例
plt.show() # 显示作图结果

运行结果

Sample 2 - subplot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/bin/python
import matplotlib.pyplot as plt

# add a subplot
ax1=plt.subplot(2, 2, 1)
# add a subplot with no frame
ax2=plt.subplot(2, 2, 2, frameon=False)
# add a polar subplot
plt.subplot(2, 2, 3, projection='polar')
# add a red subplot that shares the x-axis with ax1
plt.subplot(2, 2, 4, sharex=ax1, facecolor='red')

#delete ax2 from the figure
plt.delaxes(ax2)
#add ax2 to the figure again
plt.subplot(ax2)

plt.show()

Sample 3 - drow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from cycler import cycler
import numpy as np
import matplotlib.pyplot as plt


x = np.linspace(0, 2 * np.pi)
offsets = np.linspace(0, 2*np.pi, 4, endpoint=False)
# Create array with shifted-sine curve along each column
yy = np.transpose([np.sin(x + phi) for phi in offsets])

# 1. Setting prop cycle on default rc parameter
plt.rc('lines', linewidth=4)
plt.rc('axes', prop_cycle=(cycler('color', ['r', 'g', 'b', 'y']) +
cycler('linestyle', ['-', '--', ':', '-.'])))
fig, (ax0, ax1) = plt.subplots(nrows=2)
ax0.plot(yy)
ax0.set_title('Set default color cycle to rgby')

# 2. Define prop cycle for single set of axes color & width
ax1.set_prop_cycle(cycler('color', ['c', 'm', 'y', 'k']) +
cycler('lw', [1, 2, 3, 4]))
ax1.plot(yy)
ax1.set_title('Set axes color cycle to cmyk')

# Tweak spacing between subplots to prevent labels from overlapping
fig.subplots_adjust(hspace=0.3)
plt.show()

Sample 4 - set size && save picture

1
2
3
4
5
6
#### way 1
from matplotlib.pyplot import figure
figure(figsize=(20, 10), dpi=80,)

#### way 2
plt.figure(figsize=(15, 10), dpi=80)

pandas

安装

1
pip3 install pandas

引入

1
import pandas as pd

配置

1
2
3
4
# 修改print时显示列行列数
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 100)
pd.set_option('display.width', 1000)

导入导出

读取 csv 文件

1
2
3
df = pd.read_csv('dataset.csv')
df = pandas.read_csv('data.csv', delimiter='\t') # 指定 delimiter
df = pd.read_csv("fn", index_col=None, header=0)

从数组创建

1
2
3
4
5
6
7
>>> import pandas as pd
>>> arr = [[1, 2], [3, 4]]
>>> df = pd.DataFrame(arr, columns=['x', 'y'])
>>> df
x y
0 1 2
1 3 4

转换为 numpy 对象

1
2
# 转换为 np array
df.to_numpy()

导出到文件

1
df.to_csv('output.cvs')

数据透视

1
2
3
4
5
6
7
8
9
# 描述数据
data.describe()
data.info()

# 打印前 n 行
df.head(n)

# 打印列
data.columns

数据操作

列操作

读操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 选取一列
data.<ColumnName>

# 选取多列
data[[<ColumnName>, <ColumnName>, <...>]]
# 过滤列
data.loc[:, df.columns != '<column_name>']

# 通过索引选取
df.iloc[:, 0] # 第1列

# 对列取 unique
df[<ColumnName>].unique()

# 按类型选取列
df.select_dtypes(include=['float64', 'bool'])
df.select_dtypes(include='number') # 选取所有数字类型的列

转换操作

1
2
3
4
5
6
7
8
9
# 转换所有列
df = df.apply(pd.to_numeric)
df = df.apply(pd.to_numeric, errors='ignore') # 忽略异常

# 转换特定列
df[['a', 'b']] = df[['a', 'b']].apply(pd.to_numeric)

# 一列转换为多列
df[['a', 'b', 'c']] = df['label'].apply(lambda x: pd.Series(x.split(',')))

Drop 列

1
2
# drop 有 nan 的列
df.dropna(axis=1, how='any')

行操作

排序

1
df = df.sort_values(by=[<ClomunName>], ascending=False)

过滤

1
2
3
4
5
6
7
8
# loc
df.loc[df['column_name'] == some_value]
df.loc[df['column_name'].isin(some_values)]
df.loc[(df['column_name'] >= A) & (df['column_name'] <= B)]

# 按列值过滤
df[df['部门'].isin(['市场部']) & ~df['省份'].isin(['北京'])] # 部门=市场部 & 省份!=北京
t_df = t_df[t_df.服务名称 == '离线计算平台']

连接

1
2
3
4
5
6
7
# 用法
merge(left, right, how='inner', on=None, left_on=None, right_on=None,
left_index=False, right_index=False, sort=True,
suffixes=('_x', '_y'), copy=True, indicator=False)

# 左连接
merge(data_x, data_y, how='left', on='uid')

示例

1
2
# 读取 Cell
data['age'][6] # 读取 data 的 age 列第 6 行数据

pandas

[toc]

简介

  • 强大的数据分析和探索工具
  • 包含高级的数据结构和精巧的工具
  • 构建在Numpy之上
  • 需要安装xlrdxlwt支持excel文件读写

数据结构

Series

  • 序列, 类似于一维数组

获取值

1
s.values

DataFrame

  • 相当于一张二维的表格, 类似二维数组
  • 每一列都是一个Series

处理CSV文件

读取文件

1
2
# 读取csv文件,并转化为DataFrame
df = pd.read_csv('../data/train.csv')

导出文件

1
df.to_csv("output.csv", index=False)

DataFrame方法

数据探索

读取列

1
print(data.columns)

选取数据

  • iloc
  • DF.clomun_name : 通过列名选择

打印列的类型

1
df['Column'].dtype

过滤行

1
df['Column'][df['Column'].isna()]

转换类型

1
df['Column'] = df['Column'].astype(str)

NaN 值处理

包含 NaN 的列

1
2
for c in df.loc[:, df.isna().any()]:
pass

打印每列包含 NaN 的列数

1
df.isna().sum()

填充 NaN 值

1
df['Column'] = df['Column'].fillna(df['Column'].mean())  # 是用 Column 的平均值填充该列的 NaN 值

其他

iloc

从DataFrame中选择数据

1
2
3
df.iloc[:]		# 选择所有行,所有列
df.iloc[:, :4] # 选择所有行,前四列
df.iloc[[1, 2], [0, 1]] # 选择第2,3 行, 第1,2列

示例程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
### With Series
bovenson@ThinkCentre:~/Git/notes/Python/Pandas$ python3
Python 3.5.3 (default, Jan 19 2017, 14:11:04)
[GCC 6.3.0 20170118] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pandas as pd
>>> import numpy as np
>>> s1 = pd.Series(np.random.randn(5), index=list(range(0, 10, 2)))
>>> s1
0 0.611873
2 1.709006
4 -0.191763
6 1.393935
8 -0.751302
dtype: float64
>>> s1.iloc[:3]
0 0.611873
2 1.709006
4 -0.191763
dtype: float64
>>> s1.iloc[3]
1.3939352812007684
>>> s1.iloc[:3] = 0
>>> s1
0 0.000000
2 0.000000
4 0.000000
6 1.393935
8 -0.751302
dtype: float64


#### With DataFrame
>>> df = pd.DataFrame(np.random.randn(6, 4), index=list(range(0, 12, 2)), columns=list(range(0, 8, 2)))
>>> df
0 2 4 6
0 0.544588 -0.943141 0.656372 -0.468110
2 -2.093147 -0.122080 0.218309 0.172868
4 -0.529067 -0.418202 1.217741 -0.959730
6 -0.388783 -0.493095 -0.222721 1.224777
8 1.110017 0.786962 1.434934 -1.052975
10 1.945774 0.274088 -0.724777 -0.469002
>>> df.iloc[:3]
0 2 4 6
0 0.544588 -0.943141 0.656372 -0.468110
2 -2.093147 -0.122080 0.218309 0.172868
4 -0.529067 -0.418202 1.217741 -0.959730
>>> df.iloc[1:5, 2:4]
4 6
2 0.218309 0.172868
4 1.217741 -0.959730
6 -0.222721 1.224777
8 1.434934 -1.052975
>>> df.iloc[[1, 2], [0, 1]]
0 2
2 -2.093147 -0.122080
4 -0.529067 -0.418202
>>> df.iloc[:, :3]
0 2 4
0 0.544588 -0.943141 0.656372
2 -2.093147 -0.122080 0.218309
4 -0.529067 -0.418202 1.217741
6 -0.388783 -0.493095 -0.222721
8 1.110017 0.786962 1.434934
10 1.945774 0.274088 -0.724777
>>> df.iloc[1, 1]
-0.12207998453906502
>>> df.iloc[1]
0 -2.093147
2 -0.122080
4 0.218309
6 0.172868
Name: 2, dtype: float64

mean

计算均值

1
2
3
4
5
6
7
8
9
10
import pandas as pd

# 加载数据
train_df = pd.read_csv('./data/train.csv')

# 计算所有数值均值
train_df.mean()

# 计算某一列均值
train_df['Age'].mean()

dropna

1
DataFrame.dropna(axis=0, how='any', thresh=None, subset=None, inplace=False)
  • axis
    • 0 : 行
    • 1 : 列
    • 元组/列表 : 处理多维数据
  • how
    • any : 只要有一个值为NA,则删除该标签
    • all : 所有值均为NA,则删除该标签
  • thresh
    • 整数值,默认为空
    • 如果非NA值数量大于该值,则保留该标签
  • subset
    • 序列格式参数(array-like)
    • 只考虑给定标签的值
  • inplace
    • True : 在原始数据集上进行操作,返回None
    • False : 返回新的数据集,保持原始数据集不变

删除列

1
df.dorp('column-name', axis=1)

示例程序

1
2
3
4
5
6
7
8
9
10
11
12
13
import pandas as pd


s = pd.Series([1, 2, 3], index=['a', 'b', 'c']) # 创建一个序列s
d = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=['a', 'b', 'c']) # 创建一个表
d2 = pd.DataFrame(s)

d.head() # 预览前5行数据
d.describe() # 数据基本统计量

# 读取文件
pd.read_excel('data.xls')
pd.read_csv('data.csv', encoding='utf-8')

pyspark

常用操作

以文本形式保存 DataFrame 一列

1
2
3
4
# dateframe
data = df.rdd.map(lambda x: x.{column}).collect()
data_rdd = spark.sparkContext.parallelize(data)
data_rdd.coalesce(1).saveAsTextFile('hdfs://path')

处理 WrappedArray

1
row.scores[0]

Join 时重复行保留一个

1
df_a.join(df_b, on=[{column}]) # 使用 on

过滤

多条件过滤

1
df.filter((condition1) & (condition2))

包含字符串

1
df.filter(col('name').contains('sun'))

值比较

1
df.filter(col('name') == "wii")

Null 判断

1
2
df.filter(col('name').isNull())
df.filter(col('name').isNotNull())

In 判断

1
df.filter(col('name').isin(["wii", "bovenson"]))

创建列

重命名

1
df_renamed = df.withColumnRenamed('name1', 'name2')

从其他列新建列

数值计算

1
new_df = df.withColumn('After_discount', df.Course_Fees - df.Discount) 

使用 UDF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import pyspark.sql.functions as F 
from pyspark.sql.types import IntegerType

# define the sum_col
def Total(Course_Fees, Discount):
res = Course_Fees - Discount
return res

new_f = F.udf(Total, IntegerType())
new_df = df.withColumn("Total_price", new_f("Course_Fees", "Discount"))

# 使用 udf
@udf(IntegerType())
def Total(Course_Fees, Discount):
res = Course_Fees - Discount
return res

# 使用 udf + lambda
function = udf(lambda col1, col2 : col1-col2, IntegerType())
new_df = old_df.withColumn('col_n',function(col('col_1'), col('col_2')))

计算

最大值

1
df.agg(max("age")).show()

转换

row to json string

1
df.toJson()

写数据

1
2
df.write.format('orc').save('/path/to/destination')
df.coalesce(1).write.format('json').save('/path/to/destination') # 写单个文件

文本文件

1
df.coalesce(1).write.format("text").option("header", "false").mode("overwrite").save('/path/to/destination')

报错

1
2
3
4
5
6
7
8
# 代码
from pyspark.sql.functions import *
@F.udf(IntegerType())
def TimeDiff(a, b):
return abs(a - b)

# 报错
TypeError: Invalid argument, not a string or column: 1 of type <class 'int'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
1
因为使用 from pyspark.sql.functions import * 导入,导致 abs 使用 from pyspark.sql.functions 内的函数

TypeError: Can not infer schema for type: <class ‘str’>

rdd.toDF() 时报错。

1
2
3
4
from pyspark.sql import Row

row = Row("val") # Or some other column name
rdd.map(row).toDF()

或者

1
rdd.map(lambda x: (x, )).toDF()

spark

读 Json

1
2
3
4
var srcData = spark.read.json(srcPath)

// 获取 String vector Cell
row.getAs[Seq[String]](idx)

设置大小写敏感

1
spark.conf.set("spark.sql.caseSensitive", "true")

查询

group by

1
2
df.groupBy("column-name").count().show(false)
df.groupBy("column-name").agg(count("*").alias("count")).show(false)

读 S3 数据

pyspark

安装

1
pip3 install pyspark

读取 s3 数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 创建 spark session
from pyspark.sql import SparkSession
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.12.153,org.apache.hadoop:hadoop-aws:3.2.2,org.apache.hadoop:hadoop-client:3.2.2 pyspark-shell'
spark = SparkSession.builder.master("local[*]").config("spark.executor.memory", "24g").config("spark.driver.memory", "24g").appName("sample").getOrCreate()
# "local[1]" 使用单个数字,最多使用一个 core

# 从本地读取 aws 认证信息
import configparser
from pathlib import Path
import os
config = configparser.ConfigParser()
config.read(os.path.join(Path.home(), '.aws/credentials'))
access_key_id = config.get('default', 'aws_access_key_id')
secret_access_key = config.get('default', 'aws_secret_access_key')

# 设置 spark context
spark._jsc.hadoopConfiguration().set('fs.s3a.access.key', access_key_id)
spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key', secret_access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

# 读取文件
sample_file = "s3a://<bucket>/path/to/file"
data = spark.read.text(sample_file)

# 解析记录 (pb)
import base64
from google.protobuf.json_format import MessageToJson
import sample_pb2

def decode_pb(row):
decoded_data = base64.b64decode(row.value)
tr = sample_pb2.Example()
tr.ParseFromString(decoded_data)
return [MessageToJson(tr)]

# spark 任务
result = data.rdd.map(decode_pb).toDF(["value"])
result.show(1, False)

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 创建 Spark Session
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("queue_pb_log")
.getOrCreate()

// 读取 s3 认证信息
val cf = Paths.get(System.getProperty("user.home"), ".aws/credentials")
val c = new Ini(new File(cf.toUri))
val prefs = new IniPreferences(c)
val awsAccessKeyId = prefs.node("default").get("aws_access_key_id", "no")
val awsSecretAccessKey = prefs.node("default").get("aws_secret_access_key", "no")

// 设置 spark context
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", awsAccessKeyId)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", awsSecretAccessKey)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext.hadoopConfiguration.set("com.amazonaws.services.s3.enableV4", "true")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

// 读取日志
val sampleFile = "s3a://<bucket>/path/to/file"
val src = spark.sparkContext.textFile(sampleFile)
val result = src.map(record => {
// 解析 pb 日志
val parsed = Example.parseFrom(Base64.getDecoder.decode(record))
JsonFormat.printer().print(parsed)
})

println(result.first())

pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.7.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.8</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.8</version>
<!-- 本地运行 -->
<!-- <scope>compile</scope> -->
<!-- 远程提交 -->
<!-- <scope>provided</scope> -->
</dependency>
<!-- https://mvnrepository.com/artifact/org.ini4j/ini4j -->
<dependency>
<groupId>org.ini4j</groupId>
<artifactId>ini4j</artifactId>
<version>0.5.4</version>
</dependency>

<!-- 其他 如果版本有冲突可以尝试 -->
<properties>
<scala.version>2.11.6</scala.version>
<jackson.version>2.11.4</jackson.version>
</properties>

<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>${jackson.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-scala -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.3</version>
</dependency>

<!-- protobuf 相关 -->
<properties>
<protobuf.version>3.7.1</protobuf.version>
</properties>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>

<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<extensions>true</extensions>
<configuration>
<!-- <protocExecutable>protoc</protocExecutable>-->
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>test-compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

DataFrame 操作

合并 DataFrame

1
2
3
4
5
# 按行拼接 (列数不变)
df1.union(df2)

# 按条件拼接
df1.join(df2, ...)

按行拼接

1
2
3
4
5
6
7
8
9
10
%pyspark # zeppelin
from pyspark.sql.functions import col,monotonically_increasing_id

online_data_renamed = online_data.withColumnRenamed('value', 'online')
offline_data_renamed = offline_data.withColumnRenamed('value', 'offline')

online_data_renamed = online_data_renamed.withColumn("id",monotonically_increasing_id())
offline_data_renamed = offline_data_renamed.withColumn("id",monotonically_increasing_id())

merged = online_data_renamed.join(offline_data_renamed, online_data_renamed.id == offline_data_renamed.id, how='inner')

列操作

计算分布

1
ndf = df.groupBy('age').count()

创建 DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
# schema 1
dept = [("Finance",10), ("Marketing",20), ("Sales",30), ("IT",40)]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)

# schem 2
my_list = [("John", 25), ("Alice", 30), ("Bob", 35)]
schema = StructType([
StructField("name", StringType(), nullable=False),
StructField("age", StringType(), nullable=False)
])
df = spark.createDataFrame(my_list, schema)