matplotlib usage

简介

Matplotlib

  • 数据可视化

安装

1
2
3
sudo pip3 install matplotlib
# or
sudo apt install python3-matplotlib

示例程序

sample 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import numpy as np
import matplotlib.pyplot as plt


x = np.linspace(0, 10, 1000) # 作图的变量自变量
y = np.sin(x) + 1 # 因变量y
z = np.cos(x**2) + 1 # 因变量z

plt.figure(figsize=(8, 4)) # 设置图像大小
plt.plot(x, y, label='$\sin x+1$', color='red', linewidth=2) # 作图, 设置标签, 线条颜色, 线条大小
plt.plot(x, z, 'b--', label='$\cos x^2+1$')
plt.xlabel('Time(s) ') # x轴名称
plt.ylabel('Volt') # y轴名称
plt.ylim(0, 2.2) # 显示的y轴范围
plt.legend() # 显示图例
plt.show() # 显示作图结果

运行结果

Sample 2 - subplot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/bin/python
import matplotlib.pyplot as plt

# add a subplot
ax1=plt.subplot(2, 2, 1)
# add a subplot with no frame
ax2=plt.subplot(2, 2, 2, frameon=False)
# add a polar subplot
plt.subplot(2, 2, 3, projection='polar')
# add a red subplot that shares the x-axis with ax1
plt.subplot(2, 2, 4, sharex=ax1, facecolor='red')

#delete ax2 from the figure
plt.delaxes(ax2)
#add ax2 to the figure again
plt.subplot(ax2)

plt.show()

Sample 3 - drow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from cycler import cycler
import numpy as np
import matplotlib.pyplot as plt


x = np.linspace(0, 2 * np.pi)
offsets = np.linspace(0, 2*np.pi, 4, endpoint=False)
# Create array with shifted-sine curve along each column
yy = np.transpose([np.sin(x + phi) for phi in offsets])

# 1. Setting prop cycle on default rc parameter
plt.rc('lines', linewidth=4)
plt.rc('axes', prop_cycle=(cycler('color', ['r', 'g', 'b', 'y']) +
cycler('linestyle', ['-', '--', ':', '-.'])))
fig, (ax0, ax1) = plt.subplots(nrows=2)
ax0.plot(yy)
ax0.set_title('Set default color cycle to rgby')

# 2. Define prop cycle for single set of axes color & width
ax1.set_prop_cycle(cycler('color', ['c', 'm', 'y', 'k']) +
cycler('lw', [1, 2, 3, 4]))
ax1.plot(yy)
ax1.set_title('Set axes color cycle to cmyk')

# Tweak spacing between subplots to prevent labels from overlapping
fig.subplots_adjust(hspace=0.3)
plt.show()

Sample 4 - set size && save picture

1
2
3
4
5
6
#### way 1
from matplotlib.pyplot import figure
figure(figsize=(20, 10), dpi=80,)

#### way 2
plt.figure(figsize=(15, 10), dpi=80)

pandas

[toc]

简介

  • 强大的数据分析和探索工具
  • 包含高级的数据结构和精巧的工具
  • 构建在Numpy之上
  • 需要安装xlrdxlwt支持excel文件读写

数据结构

Series

  • 序列, 类似于一维数组

获取值

1
s.values

DataFrame

  • 相当于一张二维的表格, 类似二维数组
  • 每一列都是一个Series

处理CSV文件

读取文件

1
2
# 读取csv文件,并转化为DataFrame
df = pd.read_csv('../data/train.csv')

导出文件

1
df.to_csv("output.csv", index=False)

DataFrame方法

数据探索

读取列

1
print(data.columns)

选取数据

  • iloc
  • DF.clomun_name : 通过列名选择

打印列的类型

1
df['Column'].dtype

过滤行

1
df['Column'][df['Column'].isna()]

转换类型

1
df['Column'] = df['Column'].astype(str)

NaN 值处理

包含 NaN 的列

1
2
for c in df.loc[:, df.isna().any()]:
pass

打印每列包含 NaN 的列数

1
df.isna().sum()

填充 NaN 值

1
df['Column'] = df['Column'].fillna(df['Column'].mean())  # 是用 Column 的平均值填充该列的 NaN 值

其他

iloc

从DataFrame中选择数据

1
2
3
df.iloc[:]		# 选择所有行,所有列
df.iloc[:, :4] # 选择所有行,前四列
df.iloc[[1, 2], [0, 1]] # 选择第2,3 行, 第1,2列

示例程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
### With Series
bovenson@ThinkCentre:~/Git/notes/Python/Pandas$ python3
Python 3.5.3 (default, Jan 19 2017, 14:11:04)
[GCC 6.3.0 20170118] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import pandas as pd
>>> import numpy as np
>>> s1 = pd.Series(np.random.randn(5), index=list(range(0, 10, 2)))
>>> s1
0 0.611873
2 1.709006
4 -0.191763
6 1.393935
8 -0.751302
dtype: float64
>>> s1.iloc[:3]
0 0.611873
2 1.709006
4 -0.191763
dtype: float64
>>> s1.iloc[3]
1.3939352812007684
>>> s1.iloc[:3] = 0
>>> s1
0 0.000000
2 0.000000
4 0.000000
6 1.393935
8 -0.751302
dtype: float64


#### With DataFrame
>>> df = pd.DataFrame(np.random.randn(6, 4), index=list(range(0, 12, 2)), columns=list(range(0, 8, 2)))
>>> df
0 2 4 6
0 0.544588 -0.943141 0.656372 -0.468110
2 -2.093147 -0.122080 0.218309 0.172868
4 -0.529067 -0.418202 1.217741 -0.959730
6 -0.388783 -0.493095 -0.222721 1.224777
8 1.110017 0.786962 1.434934 -1.052975
10 1.945774 0.274088 -0.724777 -0.469002
>>> df.iloc[:3]
0 2 4 6
0 0.544588 -0.943141 0.656372 -0.468110
2 -2.093147 -0.122080 0.218309 0.172868
4 -0.529067 -0.418202 1.217741 -0.959730
>>> df.iloc[1:5, 2:4]
4 6
2 0.218309 0.172868
4 1.217741 -0.959730
6 -0.222721 1.224777
8 1.434934 -1.052975
>>> df.iloc[[1, 2], [0, 1]]
0 2
2 -2.093147 -0.122080
4 -0.529067 -0.418202
>>> df.iloc[:, :3]
0 2 4
0 0.544588 -0.943141 0.656372
2 -2.093147 -0.122080 0.218309
4 -0.529067 -0.418202 1.217741
6 -0.388783 -0.493095 -0.222721
8 1.110017 0.786962 1.434934
10 1.945774 0.274088 -0.724777
>>> df.iloc[1, 1]
-0.12207998453906502
>>> df.iloc[1]
0 -2.093147
2 -0.122080
4 0.218309
6 0.172868
Name: 2, dtype: float64

mean

计算均值

1
2
3
4
5
6
7
8
9
10
import pandas as pd

# 加载数据
train_df = pd.read_csv('./data/train.csv')

# 计算所有数值均值
train_df.mean()

# 计算某一列均值
train_df['Age'].mean()

dropna

1
DataFrame.dropna(axis=0, how='any', thresh=None, subset=None, inplace=False)
  • axis
    • 0 : 行
    • 1 : 列
    • 元组/列表 : 处理多维数据
  • how
    • any : 只要有一个值为NA,则删除该标签
    • all : 所有值均为NA,则删除该标签
  • thresh
    • 整数值,默认为空
    • 如果非NA值数量大于该值,则保留该标签
  • subset
    • 序列格式参数(array-like)
    • 只考虑给定标签的值
  • inplace
    • True : 在原始数据集上进行操作,返回None
    • False : 返回新的数据集,保持原始数据集不变

删除列

1
df.dorp('column-name', axis=1)

示例程序

1
2
3
4
5
6
7
8
9
10
11
12
13
import pandas as pd


s = pd.Series([1, 2, 3], index=['a', 'b', 'c']) # 创建一个序列s
d = pd.DataFrame([[1, 2, 3], [4, 5, 6]], columns=['a', 'b', 'c']) # 创建一个表
d2 = pd.DataFrame(s)

d.head() # 预览前5行数据
d.describe() # 数据基本统计量

# 读取文件
pd.read_excel('data.xls')
pd.read_csv('data.csv', encoding='utf-8')

pandas

安装

1
pip3 install pandas

引入

1
import pandas as pd

配置

1
2
3
4
# 修改print时显示列行列数
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', 100)
pd.set_option('display.width', 1000)

导入导出

读取 csv 文件

1
2
3
df = pd.read_csv('dataset.csv')
df = pandas.read_csv('data.csv', delimiter='\t') # 指定 delimiter
df = pd.read_csv("fn", index_col=None, header=0)

从数组创建

1
2
3
4
5
6
7
>>> import pandas as pd
>>> arr = [[1, 2], [3, 4]]
>>> df = pd.DataFrame(arr, columns=['x', 'y'])
>>> df
x y
0 1 2
1 3 4

转换为 numpy 对象

1
2
# 转换为 np array
df.to_numpy()

导出到文件

1
df.to_csv('output.cvs')

数据透视

1
2
3
4
5
6
7
8
9
# 描述数据
data.describe()
data.info()

# 打印前 n 行
df.head(n)

# 打印列
data.columns

数据操作

列操作

读操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 选取一列
data.<ColumnName>

# 选取多列
data[[<ColumnName>, <ColumnName>, <...>]]
# 过滤列
data.loc[:, df.columns != '<column_name>']

# 通过索引选取
df.iloc[:, 0] # 第1列

# 对列取 unique
df[<ColumnName>].unique()

# 按类型选取列
df.select_dtypes(include=['float64', 'bool'])
df.select_dtypes(include='number') # 选取所有数字类型的列

转换操作

1
2
3
4
5
6
7
8
9
# 转换所有列
df = df.apply(pd.to_numeric)
df = df.apply(pd.to_numeric, errors='ignore') # 忽略异常

# 转换特定列
df[['a', 'b']] = df[['a', 'b']].apply(pd.to_numeric)

# 一列转换为多列
df[['a', 'b', 'c']] = df['label'].apply(lambda x: pd.Series(x.split(',')))

Drop 列

1
2
# drop 有 nan 的列
df.dropna(axis=1, how='any')

行操作

排序

1
df = df.sort_values(by=[<ClomunName>], ascending=False)

过滤

1
2
3
4
5
6
7
8
# loc
df.loc[df['column_name'] == some_value]
df.loc[df['column_name'].isin(some_values)]
df.loc[(df['column_name'] >= A) & (df['column_name'] <= B)]

# 按列值过滤
df[df['部门'].isin(['市场部']) & ~df['省份'].isin(['北京'])] # 部门=市场部 & 省份!=北京
t_df = t_df[t_df.服务名称 == '离线计算平台']

连接

1
2
3
4
5
6
7
# 用法
merge(left, right, how='inner', on=None, left_on=None, right_on=None,
left_index=False, right_index=False, sort=True,
suffixes=('_x', '_y'), copy=True, indicator=False)

# 左连接
merge(data_x, data_y, how='left', on='uid')

示例

1
2
# 读取 Cell
data['age'][6] # 读取 data 的 age 列第 6 行数据

pyspark

常用操作

以文本形式保存 DataFrame 一列

1
2
3
4
# dateframe
data = df.rdd.map(lambda x: x.{column}).collect()
data_rdd = spark.sparkContext.parallelize(data)
data_rdd.coalesce(1).saveAsTextFile('hdfs://path')

处理 WrappedArray

1
row.scores[0]

Join 时重复行保留一个

1
df_a.join(df_b, on=[{column}]) # 使用 on

过滤

多条件过滤

1
df.filter((condition1) & (condition2))

包含字符串

1
df.filter(col('name').contains('sun'))

值比较

1
df.filter(col('name') == "wii")

Null 判断

1
2
df.filter(col('name').isNull())
df.filter(col('name').isNotNull())

In 判断

1
df.filter(col('name').isin(["wii", "bovenson"]))

创建列

重命名

1
df_renamed = df.withColumnRenamed('name1', 'name2')

从其他列新建列

数值计算

1
new_df = df.withColumn('After_discount', df.Course_Fees - df.Discount) 

使用 UDF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import pyspark.sql.functions as F 
from pyspark.sql.types import IntegerType

# define the sum_col
def Total(Course_Fees, Discount):
res = Course_Fees - Discount
return res

new_f = F.udf(Total, IntegerType())
new_df = df.withColumn("Total_price", new_f("Course_Fees", "Discount"))

# 使用 udf
@udf(IntegerType())
def Total(Course_Fees, Discount):
res = Course_Fees - Discount
return res

# 使用 udf + lambda
function = udf(lambda col1, col2 : col1-col2, IntegerType())
new_df = old_df.withColumn('col_n',function(col('col_1'), col('col_2')))

计算

最大值

1
df.agg(max("age")).show()

转换

row to json string

1
df.toJson()

写数据

1
2
df.write.format('orc').save('/path/to/destination')
df.coalesce(1).write.format('json').save('/path/to/destination') # 写单个文件

文本文件

1
df.coalesce(1).write.format("text").option("header", "false").mode("overwrite").save('/path/to/destination')

报错

1
2
3
4
5
6
7
8
# 代码
from pyspark.sql.functions import *
@F.udf(IntegerType())
def TimeDiff(a, b):
return abs(a - b)

# 报错
TypeError: Invalid argument, not a string or column: 1 of type <class 'int'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
1
因为使用 from pyspark.sql.functions import * 导入,导致 abs 使用 from pyspark.sql.functions 内的函数

TypeError: Can not infer schema for type: <class ‘str’>

rdd.toDF() 时报错。

1
2
3
4
from pyspark.sql import Row

row = Row("val") # Or some other column name
rdd.map(row).toDF()

或者

1
rdd.map(lambda x: (x, )).toDF()

spark

读 Json

1
2
3
4
var srcData = spark.read.json(srcPath)

// 获取 String vector Cell
row.getAs[Seq[String]](idx)

设置大小写敏感

1
spark.conf.set("spark.sql.caseSensitive", "true")

查询

group by

1
2
df.groupBy("column-name").count().show(false)
df.groupBy("column-name").agg(count("*").alias("count")).show(false)

读 S3 数据

pyspark

安装

1
pip3 install pyspark

读取 s3 数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 创建 spark session
from pyspark.sql import SparkSession
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.12.153,org.apache.hadoop:hadoop-aws:3.2.2,org.apache.hadoop:hadoop-client:3.2.2 pyspark-shell'
spark = SparkSession.builder.master("local[*]").config("spark.executor.memory", "24g").config("spark.driver.memory", "24g").appName("sample").getOrCreate()
# "local[1]" 使用单个数字,最多使用一个 core

# 从本地读取 aws 认证信息
import configparser
from pathlib import Path
import os
config = configparser.ConfigParser()
config.read(os.path.join(Path.home(), '.aws/credentials'))
access_key_id = config.get('default', 'aws_access_key_id')
secret_access_key = config.get('default', 'aws_secret_access_key')

# 设置 spark context
spark._jsc.hadoopConfiguration().set('fs.s3a.access.key', access_key_id)
spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key', secret_access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

# 读取文件
sample_file = "s3a://<bucket>/path/to/file"
data = spark.read.text(sample_file)

# 解析记录 (pb)
import base64
from google.protobuf.json_format import MessageToJson
import sample_pb2

def decode_pb(row):
decoded_data = base64.b64decode(row.value)
tr = sample_pb2.Example()
tr.ParseFromString(decoded_data)
return [MessageToJson(tr)]

# spark 任务
result = data.rdd.map(decode_pb).toDF(["value"])
result.show(1, False)

scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 创建 Spark Session
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.appName("queue_pb_log")
.getOrCreate()

// 读取 s3 认证信息
val cf = Paths.get(System.getProperty("user.home"), ".aws/credentials")
val c = new Ini(new File(cf.toUri))
val prefs = new IniPreferences(c)
val awsAccessKeyId = prefs.node("default").get("aws_access_key_id", "no")
val awsSecretAccessKey = prefs.node("default").get("aws_secret_access_key", "no")

// 设置 spark context
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", awsAccessKeyId)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", awsSecretAccessKey)
spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext.hadoopConfiguration.set("com.amazonaws.services.s3.enableV4", "true")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.InstanceProfileCredentialsProvider,com.amazonaws.auth.DefaultAWSCredentialsProviderChain")

// 读取日志
val sampleFile = "s3a://<bucket>/path/to/file"
val src = spark.sparkContext.textFile(sampleFile)
val result = src.map(record => {
// 解析 pb 日志
val parsed = Example.parseFrom(Base64.getDecoder.decode(record))
JsonFormat.printer().print(parsed)
})

println(result.first())

pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.7.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.8</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.8</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.8</version>
<!-- 本地运行 -->
<!-- <scope>compile</scope> -->
<!-- 远程提交 -->
<!-- <scope>provided</scope> -->
</dependency>
<!-- https://mvnrepository.com/artifact/org.ini4j/ini4j -->
<dependency>
<groupId>org.ini4j</groupId>
<artifactId>ini4j</artifactId>
<version>0.5.4</version>
</dependency>

<!-- 其他 如果版本有冲突可以尝试 -->
<properties>
<scala.version>2.11.6</scala.version>
<jackson.version>2.11.4</jackson.version>
</properties>

<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>${jackson.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.module/jackson-module-scala -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.3</version>
</dependency>

<!-- protobuf 相关 -->
<properties>
<protobuf.version>3.7.1</protobuf.version>
</properties>

<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>${protobuf.version}</version>
</dependency>

<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.1</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<extensions>true</extensions>
<configuration>
<!-- <protocExecutable>protoc</protocExecutable>-->
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>test-compile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

DataFrame 操作

合并 DataFrame

1
2
3
4
5
# 按行拼接 (列数不变)
df1.union(df2)

# 按条件拼接
df1.join(df2, ...)

按行拼接

1
2
3
4
5
6
7
8
9
10
%pyspark # zeppelin
from pyspark.sql.functions import col,monotonically_increasing_id

online_data_renamed = online_data.withColumnRenamed('value', 'online')
offline_data_renamed = offline_data.withColumnRenamed('value', 'offline')

online_data_renamed = online_data_renamed.withColumn("id",monotonically_increasing_id())
offline_data_renamed = offline_data_renamed.withColumn("id",monotonically_increasing_id())

merged = online_data_renamed.join(offline_data_renamed, online_data_renamed.id == offline_data_renamed.id, how='inner')

列操作

计算分布

1
ndf = df.groupBy('age').count()

创建 DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
# schema 1
dept = [("Finance",10), ("Marketing",20), ("Sales",30), ("IT",40)]
deptColumns = ["dept_name","dept_id"]
deptDF = spark.createDataFrame(data=dept, schema = deptColumns)

# schem 2
my_list = [("John", 25), ("Alice", 30), ("Bob", 35)]
schema = StructType([
StructField("name", StringType(), nullable=False),
StructField("age", StringType(), nullable=False)
])
df = spark.createDataFrame(my_list, schema)

hadoop startup

[toc]

搭建

节点

1
2
3
4
5
6
# 这里有三个节点, dp1 dp2 dp3
~$ cat /etc/hosts
...
10.1.0.191 dp1
10.1.0.229 dp2
10.1.0.110 dp3

将节点信息在各节点的 /etc/hosts 内配置。

安装 jdk

1
2
# centos 8
sudo dnf install java-11-openjdk-devel

配置免密登录

1
2
ssh-keygen -t rsa -c "email" # 或者直接 ssh-keygen
ssh-copy-id -i ~/.ssh/id_rsa.pub user@host

下载 hadoop

1
wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.3/hadoop-3.3.3.tar.gz

添加环境变量

1
2
3
4
sudo vim /etc/profile
# 添加如下内容
export HADOOP_HOME=/usr/local/hadoop-3.3.3
export PATH=${HADOOP_HOME}/bin:$PATH

修改 hadoop 配置

1
cd ${HADOOP_HOME}/etc/hadoop

hadoop-env.sh

1
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk  # openjdk

core-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
<configuration>
<property>
<!--指定 namenode 的 hdfs 协议文件系统的通信地址-->
<name>fs.defaultFS</name>
<value>hdfs://dp1:8020</value>
</property>
<property>
<!--指定 hadoop 集群存储临时文件的目录-->
<name>hadoop.tmp.dir</name>
<value>/data/hadoop/tmp</value>
</property>
</configuration>

hdfs-site.xml

1
2
3
4
5
6
7
8
9
10
<property>
<!--namenode 节点数据(即元数据)的存放位置,可以指定多个目录实现容错,多个目录用逗号分隔-->
<name>dfs.namenode.name.dir</name>
<value>/data/hadoop/namenode/data</value>
</property>
<property>
<!--datanode 节点数据(即数据块)的存放位置-->
<name>dfs.datanode.data.dir</name>
<value>/data/hadoop/datanode/data</value>
</property>

yarn-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
<configuration>
<property>
<!--配置 NodeManager 上运行的附属服务。需要配置成 mapreduce_shuffle 后才可以在 Yarn 上运行 MapReduce 程序。-->
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<!--resourcemanager 的主机名-->
<name>yarn.resourcemanager.hostname</name>
<value>dp1</value>
</property>
</configuration>

mapred-site.xml

1
2
3
4
5
6
7
<configuration>
<property>
<!--指定 mapreduce 作业运行在 yarn 上-->
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>

workers

1
2
3
dp1
dp2
dp3

分发 hadoop

1
2
3
# 需要在 dp2、dp3 修改 /usr/local/ 目录的写入权限
scp -r /usr/local/hadoop-3.3.3 dp2:/usr/local/
scp -r /usr/local/hadoop-3.3.3 dp3:/usr/local/

初始化

1
2
# 在 dp1 上执行
hdfs namenode -format

启动集群

1
2
3
4
5
# ${HADOOP_HOME}/sbin 目录下
# 启动dfs服务
./start-dfs.sh
# 启动yarn服务
./start-yarn.sh

查看集群

1
2
3
4
5
6
7
$ jps
71938 NodeManager
71285 DataNode
70519 ResourceManager
72124 Jps
71135 NameNode
71550 SecondaryNameNode

访问

端口 说明
8088 资源管理界面
8020 name node rpc 端口
9870 name node http 端口
9864 data node http 端口

停掉集群

1
2
# sbin 下
./stop-all.sh

配置自启动

1
2
3
crontab -e
# 添加下面一行
@reboot bash /usr/local/hadoop-3.3.3/sbin/start-dfs.sh && bash /usr/local/hadoop-3.3.3/sbin/start-yarn.sh

Client

如果需要从本地机器向远程的 hadoop 集群传输文件,可以再从线上下载一份 hadoop 程序(保持配置),然后做下面的更改。

  • 配置好 PATH
  • 修改 hadoop-env.sh 里面的 JAVA_HOME
  • /etc/hosts 添加对应的主机记录

这里需要注意的是,hadoop 集群可能是用的内网 ip,比如我搭建的 hadoop 集群在 openstack 上面,ip 用的是 10.1.0.x。如果是连接在外部网络的机器(192.168.6.x),是不能直接访问的。

1
2
3
4
5
6
7
# 本地机器 ip
192.168.6.13

# hadoop 集群
10.1.0.191 dp1
10.1.0.229 dp2
10.1.0.110 dp3

如果直接使用下面命令。

1
2
hdfs dfs -copyFromLocal /local/path/to/file /remote/path/to/save
hdfs dfs -copyToLocal /remote/path/to/file /local/path/to/save

会报下面的错误。

1
2
3
4
5
6
7
8
9
10
➜  ~ hdfs dfs -copyFromLocal /Users/wii/heap /tmp/
2022-06-04 23:20:22,272 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2022-06-04 23:21:23,169 WARN hdfs.DataStreamer: Exception in createBlockOutputStream blk_1073741843_1019
org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.1.0.191:9866]
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:589)
at org.apache.hadoop.hdfs.DataStreamer.createSocketForPipeline(DataStreamer.java:253)
at org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1774)
at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1728)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:713)
...

可以看到,会直接访问 10.1.0.191 ,显然是有问题的,因为外部网络没办法直接访问 openstack 的 internal network,解决方案是设置一下路由,下面是 openstack 的网络拓扑。

image-20220604233121569

内部网络是通过一个路由(192.168.6.129)和外部网络连接,下面是在本地添加静态路由的代码。

1
2
3
sudo route add -net 10.1.0.0 -netmask 255.255.0.0 192.168.6.129
# 下面是删除路由的代码
sudo route -v delete -net 10.1.0.0 -gateway 192.168.6.129

也可以在路由器配置。

image-20220604233802672

再去 ping 就可以了。

1
2
3
4
5
6
7
8
9
➜  ~ ping 10.1.0.191
PING 10.1.0.191 (10.1.0.191): 56 data bytes
64 bytes from 10.1.0.191: icmp_seq=0 ttl=63 time=4.142 ms
64 bytes from 10.1.0.191: icmp_seq=1 ttl=63 time=3.630 ms
64 bytes from 10.1.0.191: icmp_seq=2 ttl=63 time=2.651 ms
^C
--- 10.1.0.191 ping statistics ---
3 packets transmitted, 3 packets received, 0.0% packet loss
round-trip min/avg/max/stddev = 2.651/3.474/4.142/0.619 ms

还有一个问题就是,hadoop 集群不要使用外部网络 ip,这样在传输数据的时候都要走一遍外部网络,效率极低。

权限

关闭权限控制

在内网搭建的集群,为了方便把权限关掉了。

1
2
3
4
5
<!-- hdfs-site.xml -->
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>

使用 docker compose 安装

docker hub 文档参考这里,docker compose 配置参考这里

配置 Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 从容器拷贝 hadoop 程序
docker cp datascience-namenode-1:/opt/hadoop /opt/

# 添加到 path
export PATH=/usr/local/hadoop/bin:$PATH

# 获取 nodename 节点 ip
docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' datascience-namenode-1
## 172.18.0.4

# 修改 hosts
## sudo vim /etc/hosts
## 添加如下内容
172.18.0.4 namenode

注意:提前配置 JAVA_HOME

测试

1
2
3
4
$ hdfs dfs -ls /
$ hdfs dfs -mkdir /data
$ hdfs dfs -ls /
drwxr-xr-x - hadoop supergroup 0 2023-08-02 10:25 /data

关闭权限控制

环境变量修改。

1
HDFS-SITE.XML_dfs.permissions.enabled=false

配置文件修改。

1
2
3
4
5
<!-- hdfs-site.xml -->
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>

MR 作业

配置

作业提交 / 环境变量配置集群属性等。

1
2
3
4
5
# 代码中获取配置
Configuration conf = getConf();

# 添加配置
conf.set(key, value)

Job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# setJarByClass
通过指定类名,获取需要分发到其他节点的Jar包,来执行map、reduce操作。

# setMapperClass
mapper 操作类

# setMapOutputKeyClass
mapper 输出的key类型

# setMapOutputValueClass
mapper 输出的value类型

# setReducerClass
reduce 操作类

# setOutputKeyClass
reduce 输出key类型

# setOutputValueClass
reduce 输出value类型

# setInputFormatClass
输入文件格式

# setOutputFormatClass
输出文件格式

# setNumReduceTasks
设置 reduce 任务数量

# setJobSetupCleanupNeeded
每个task, 是否执行 setup / cleanup 操作

# setSortComparatorClass
设置排序阶段比较器

# setReduceSpeculativeExecution
设置是否开启reduce阶段的推测执行

# setCombinerClass
设置map阶段combine的类;combine运行在reduce之前,也被称为semi-reducer;输入来自map class,输出发送给实际的reduce作为输入。
# 参考: https://www.tutorialspoint.com/map_reduce/map_reduce_combiners.htm

# setCombinerKeyGroupingComparatorClass
设置combine阶段比较器

# setGroupingComparatorClass
对发往 reduce 的键值对进行分组操作

文件操作

1
2
3
4
5
6
7
8
# 获取 FileSystem
FileSystem fs = FileSystem.get(conf);

# 判断是否存在
fs.exists(path)

# 删除
fs.delete(path, true); # (path, recursive)

HDFS

1
2
3
4
5
# 创建目录
hdfs dfs -mkdir -p /path/to/dir/

# 上传文件
hdfs dfs -put /path/to/file /path