hadoop startup

[toc]

搭建

节点

1
2
3
4
5
6
# 这里有三个节点, dp1 dp2 dp3
~$ cat /etc/hosts
...
10.1.0.191 dp1
10.1.0.229 dp2
10.1.0.110 dp3

将节点信息在各节点的 /etc/hosts 内配置。

安装 jdk

1
2
# centos 8
sudo dnf install java-11-openjdk-devel

配置免密登录

1
2
ssh-keygen -t rsa -c "email" # 或者直接 ssh-keygen
ssh-copy-id -i ~/.ssh/id_rsa.pub user@host

下载 hadoop

1
wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.3/hadoop-3.3.3.tar.gz

添加环境变量

1
2
3
4
sudo vim /etc/profile
# 添加如下内容
export HADOOP_HOME=/usr/local/hadoop-3.3.3
export PATH=${HADOOP_HOME}/bin:$PATH

修改 hadoop 配置

1
cd ${HADOOP_HOME}/etc/hadoop

hadoop-env.sh

1
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk  # openjdk

core-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
<configuration>
<property>
<!--指定 namenode 的 hdfs 协议文件系统的通信地址-->
<name>fs.defaultFS</name>
<value>hdfs://dp1:8020</value>
</property>
<property>
<!--指定 hadoop 集群存储临时文件的目录-->
<name>hadoop.tmp.dir</name>
<value>/data/hadoop/tmp</value>
</property>
</configuration>

hdfs-site.xml

1
2
3
4
5
6
7
8
9
10
<property>
<!--namenode 节点数据(即元数据)的存放位置,可以指定多个目录实现容错,多个目录用逗号分隔-->
<name>dfs.namenode.name.dir</name>
<value>/data/hadoop/namenode/data</value>
</property>
<property>
<!--datanode 节点数据(即数据块)的存放位置-->
<name>dfs.datanode.data.dir</name>
<value>/data/hadoop/datanode/data</value>
</property>

yarn-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
<configuration>
<property>
<!--配置 NodeManager 上运行的附属服务。需要配置成 mapreduce_shuffle 后才可以在 Yarn 上运行 MapReduce 程序。-->
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<!--resourcemanager 的主机名-->
<name>yarn.resourcemanager.hostname</name>
<value>dp1</value>
</property>
</configuration>

mapred-site.xml

1
2
3
4
5
6
7
<configuration>
<property>
<!--指定 mapreduce 作业运行在 yarn 上-->
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>

workers

1
2
3
dp1
dp2
dp3

分发 hadoop

1
2
3
# 需要在 dp2、dp3 修改 /usr/local/ 目录的写入权限
scp -r /usr/local/hadoop-3.3.3 dp2:/usr/local/
scp -r /usr/local/hadoop-3.3.3 dp3:/usr/local/

初始化

1
2
# 在 dp1 上执行
hdfs namenode -format

启动集群

1
2
3
4
5
# ${HADOOP_HOME}/sbin 目录下
# 启动dfs服务
./start-dfs.sh
# 启动yarn服务
./start-yarn.sh

查看集群

1
2
3
4
5
6
7
$ jps
71938 NodeManager
71285 DataNode
70519 ResourceManager
72124 Jps
71135 NameNode
71550 SecondaryNameNode

访问

端口 说明
8088 资源管理界面
8020 name node rpc 端口
9870 name node http 端口
9864 data node http 端口

停掉集群

1
2
# sbin 下
./stop-all.sh

配置自启动

1
2
3
crontab -e
# 添加下面一行
@reboot bash /usr/local/hadoop-3.3.3/sbin/start-dfs.sh && bash /usr/local/hadoop-3.3.3/sbin/start-yarn.sh

Client

如果需要从本地机器向远程的 hadoop 集群传输文件,可以再从线上下载一份 hadoop 程序(保持配置),然后做下面的更改。

  • 配置好 PATH
  • 修改 hadoop-env.sh 里面的 JAVA_HOME
  • /etc/hosts 添加对应的主机记录

这里需要注意的是,hadoop 集群可能是用的内网 ip,比如我搭建的 hadoop 集群在 openstack 上面,ip 用的是 10.1.0.x。如果是连接在外部网络的机器(192.168.6.x),是不能直接访问的。

1
2
3
4
5
6
7
# 本地机器 ip
192.168.6.13

# hadoop 集群
10.1.0.191 dp1
10.1.0.229 dp2
10.1.0.110 dp3

如果直接使用下面命令。

1
2
hdfs dfs -copyFromLocal /local/path/to/file /remote/path/to/save
hdfs dfs -copyToLocal /remote/path/to/file /local/path/to/save

会报下面的错误。

1
2
3
4
5
6
7
8
9
10
➜  ~ hdfs dfs -copyFromLocal /Users/wii/heap /tmp/
2022-06-04 23:20:22,272 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2022-06-04 23:21:23,169 WARN hdfs.DataStreamer: Exception in createBlockOutputStream blk_1073741843_1019
org.apache.hadoop.net.ConnectTimeoutException: 60000 millis timeout while waiting for channel to be ready for connect. ch : java.nio.channels.SocketChannel[connection-pending remote=/10.1.0.191:9866]
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:589)
at org.apache.hadoop.hdfs.DataStreamer.createSocketForPipeline(DataStreamer.java:253)
at org.apache.hadoop.hdfs.DataStreamer.createBlockOutputStream(DataStreamer.java:1774)
at org.apache.hadoop.hdfs.DataStreamer.nextBlockOutputStream(DataStreamer.java:1728)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:713)
...

可以看到,会直接访问 10.1.0.191 ,显然是有问题的,因为外部网络没办法直接访问 openstack 的 internal network,解决方案是设置一下路由,下面是 openstack 的网络拓扑。

image-20220604233121569

内部网络是通过一个路由(192.168.6.129)和外部网络连接,下面是在本地添加静态路由的代码。

1
2
3
sudo route add -net 10.1.0.0 -netmask 255.255.0.0 192.168.6.129
# 下面是删除路由的代码
sudo route -v delete -net 10.1.0.0 -gateway 192.168.6.129

也可以在路由器配置。

image-20220604233802672

再去 ping 就可以了。

1
2
3
4
5
6
7
8
9
➜  ~ ping 10.1.0.191
PING 10.1.0.191 (10.1.0.191): 56 data bytes
64 bytes from 10.1.0.191: icmp_seq=0 ttl=63 time=4.142 ms
64 bytes from 10.1.0.191: icmp_seq=1 ttl=63 time=3.630 ms
64 bytes from 10.1.0.191: icmp_seq=2 ttl=63 time=2.651 ms
^C
--- 10.1.0.191 ping statistics ---
3 packets transmitted, 3 packets received, 0.0% packet loss
round-trip min/avg/max/stddev = 2.651/3.474/4.142/0.619 ms

还有一个问题就是,hadoop 集群不要使用外部网络 ip,这样在传输数据的时候都要走一遍外部网络,效率极低。

权限

关闭权限控制

在内网搭建的集群,为了方便把权限关掉了。

1
2
3
4
5
<!-- hdfs-site.xml -->
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>

使用 docker compose 安装

docker hub 文档参考这里,docker compose 配置参考这里

配置 Client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 从容器拷贝 hadoop 程序
docker cp hadoop_namenode_1:/opt/hadoop /usr/local/

# 添加到 path
export PATH=/usr/local/hadoop/bin:$PATH

# 获取 nodename 节点 ip
docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' hadoop_namenode_1
## 172.18.0.4

# 修改 hosts
## sudo vim /etc/hosts
## 添加如下内容
172.18.0.4 namenode

注意:提前配置 JAVA_HOME

测试

1
2
3
4
$ hdfs dfs -ls /
$ hdfs dfs -mkdir /data
$ hdfs dfs -ls /
drwxr-xr-x - hadoop supergroup 0 2023-08-02 10:25 /data

关闭权限控制

1
2
3
4
5
<!-- hdfs-site.xml -->
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>

MR 作业

配置

作业提交 / 环境变量配置集群属性等。

1
2
3
4
5
# 代码中获取配置
Configuration conf = getConf();

# 添加配置
conf.set(key, value)

Job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# setJarByClass
通过指定类名,获取需要分发到其他节点的Jar包,来执行map、reduce操作。

# setMapperClass
mapper 操作类

# setMapOutputKeyClass
mapper 输出的key类型

# setMapOutputValueClass
mapper 输出的value类型

# setReducerClass
reduce 操作类

# setOutputKeyClass
reduce 输出key类型

# setOutputValueClass
reduce 输出value类型

# setInputFormatClass
输入文件格式

# setOutputFormatClass
输出文件格式

# setNumReduceTasks
设置 reduce 任务数量

# setJobSetupCleanupNeeded
每个task, 是否执行 setup / cleanup 操作

# setSortComparatorClass
设置排序阶段比较器

# setReduceSpeculativeExecution
设置是否开启reduce阶段的推测执行

# setCombinerClass
设置map阶段combine的类;combine运行在reduce之前,也被称为semi-reducer;输入来自map class,输出发送给实际的reduce作为输入。
# 参考: https://www.tutorialspoint.com/map_reduce/map_reduce_combiners.htm

# setCombinerKeyGroupingComparatorClass
设置combine阶段比较器

# setGroupingComparatorClass
对发往 reduce 的键值对进行分组操作

文件操作

1
2
3
4
5
6
7
8
# 获取 FileSystem
FileSystem fs = FileSystem.get(conf);

# 判断是否存在
fs.exists(path)

# 删除
fs.delete(path, true); # (path, recursive)

命令

1
2
3
4
5
# 创建目录
hdfs dfs -mkdir -p /path/to/dir/

# 上传文件
hdfs dfs -put /path/to/file /path

c++ 复杂对象

说明

这是一个小实验。

定义一个复杂类

1
2
3
4
5
6
struct Complex {
int64_t a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p, q, r, s, t, u, v, w, x, y, z,
a1, b1, c1, d1, e1, f1, g1, h1, i1, j1, k1, l1, m1, n1, o1, p1, q1, r1, s1, t1, u1, v1, w1, x1, y1, z1,
a2, b2, c2, d2, e2, f2, g2, h2, i2, j2, k2, l2, m2, n2, o2, p2, q2, r2, s2, t2, u2, v2, w2, x2, y2, z2,
a3, b3, c3, d3, e3, f3, g3, h3, i3, j3, k3, l3, m3, n3, o3, p3, q3, r3, s3, t3, u3, v3, w3, x3, y3, z3;
};

性能对比

作为实验,我们把负载类的对象插入一个 Map。在开始性能对比之前,定义计算时间的工具函数。

1
2
3
4
5
6
7
8
std::chrono::time_point<std::chrono::system_clock> now() {
return std::chrono::system_clock::now();
}

long elapsed(std::chrono::time_point<std::chrono::system_clock> &start) {
auto e = std::chrono::system_clock::now() - start;
return std::chrono::duration_cast<std::chrono::milliseconds>(e).count();
}

std::map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
template<typename K, typename V>
using Map = std::map<K, V>;

int main() {
typedef Complex MapV;
int loop = 500 * 10000;
MapV complex;
Map<int, MapV> m;
auto start = now();
for (int i = 0; i < loop; ++i) {
m[i] = complex;
}
std::cout << "elapsed: " << elapsed(start) << std::endl;
return 0;
}

输出

1
elapsed: 7750

std::unordered_map

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
template<typename K, typename V>
using Map = std::unordered_map<K, V>;

int main() {
typedef Complex MapV;
int loop = 500 * 10000;
MapV complex;
Map<int, MapV> m;
auto start = now();
for (int i = 0; i < loop; ++i) {
m[i] = complex;
}
std::cout << "elapsed: " << elapsed(start) << std::endl;
return 0;
}

输出

1
elapsed: 4474

std::shared_ptr

如果我们改为使用共享指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
template<typename K, typename V>
using Map = std::unordered_map<K, V>;

int main() {
typedef std::shared_ptr<Complex> MapV;
int loop = 1000 * 10000;
MapV complex = std::make_shared<Complex>();
Map<int, MapV> m;
auto start = now();
for (int i = 0; i < loop; ++i) {
m[i] = complex;
}
std::cout << "elapsed: " << elapsed(start) << std::endl;
return 0;
}

输出

1
elapsed: 2002

简单对象

再来对比下指针和简单对象。

智能指针

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
template<typename K, typename V>
using Map = std::unordered_map<K, V>;

int main() {
typedef int64_t BaseType;
typedef std::shared_ptr<BaseType> MapV;
int loop = 500 * 10000;
MapV complex = std::make_shared<BaseType>();
Map<int, MapV> m;
auto start = now();
for (int i = 0; i < loop; ++i) {
m[i] = complex;
}
std::cout << "elapsed: " << elapsed(start) << std::endl;
return 0;
}

输出

1
elapsed: 2012

简单对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
template<typename K, typename V>
using Map = std::unordered_map<K, V>;

int main() {
typedef int64_t BaseType;
typedef BaseType MapV;
int loop = 500 * 10000;
MapV complex{};
Map<int, MapV> m;
auto start = now();
for (int i = 0; i < loop; ++i) {
m[i] = complex;
}
std::cout << "elapsed: " << elapsed(start) << std::endl;
return 0;
}

输出

1
elapsed: 1763

std::move

std::move 实际会调用 move constructor(A(A &&another)),所以,对于基础类型和没有实现 move constructor 的类不起效果。

定义 A。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class A {
public:
A() {
std::cout << "construct A" << std::endl;
}

A(const A& a) {
std::cout << "copy construct A" << std::endl;
}

~A() {
std::cout << "destruct A" << std::endl;
}
};

看下插入 map。

1
2
3
4
5
6
7
8
9
10
11
std::map<int, A> ma;
{
A a;
ma.emplace(1, std::move(a));
}

// 输出
construct A
copy construct A
destruct A
destruct A

用如下方式。

1
2
3
4
5
6
7
8
std::map<int, A> ma;
{
ma[1];
}

// 输出
construct A
destruct A

实现移动构造。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class A {
public:
A() {
std::cout << "construct A" << std::endl;
}

A(const A& a) {
std::cout << "copy construct A" << std::endl;
}

A(A &&a) {
std::cout << "move construct A" << std::endl;
}

~A() {
std::cout << "destruct A" << std::endl;
}
};

申请内存

先来看下空转。

1
2
3
4
5
6
7
8
int loop = 1000 * 10000;
auto start = now();
for (int i = 0; i < loop; ++i) {
}
std::cout << "elapsed: " << elapsed(start) << std::endl;

// output
elapsed: 24

再来看下申请内存。

1
2
3
4
5
6
7
8
9
int loop = 1000 * 10000;
auto start = now();
for (int i = 0; i < loop; ++i) {
Complex complex{};
}
std::cout << "elapsed: " << elapsed(start) << std::endl;

// output
elapsed: 144

再来看下插入 vector。

1
2
3
4
5
6
7
8
9
10
11
int loop = 1000 * 10000;
std::vector<Complex> complexes;
auto start = now();
for (int i = 0; i < loop; ++i) {
Complex complex{};
complexes.emplace_back(complex);
}
std::cout << "elapsed: " << elapsed(start) << std::endl;

// output
elapsed: 9663

再看下 reserve size。

1
2
3
4
5
6
7
8
9
10
11
12
int loop = 1000 * 10000;
std::vector<Complex> complexes;
complexes.reserve(loop);
auto start = now();
for (int i = 0; i < loop; ++i) {
Complex complex{};
complexes.emplace_back(complex);
}
std::cout << "elapsed: " << elapsed(start) << std::endl;

// output
elapsed: 2525

再看下通过构造函数初始化列表。

1
2
3
4
5
6
7
int loop = 1000 * 10000;
auto start = now();
std::vector<Complex> complexes(loop, Complex{});
std::cout << "elapsed: " << elapsed(start) << std::endl;

// output
elapsed: 2069

再来看下 resize。

1
2
3
4
5
6
7
8
int loop = 1000 * 10000;
std::vector<Complex> complexes;
auto start = now();
complexes.resize(loop);
std::cout << "elapsed: " << elapsed(start) << std::endl;

// output
elapsed: 1996

小结

在深度理解程序的过程中,很多操作的实际成本和我们想象中的不太一样,用一句话直白的说明就是,“这个操作竟然这么耗时?”,或者,“这个操作这么快?”。

操作

  • 计算
  • 内存申请
  • 容器
    • map,插入、查找

protobuf - go

Tips

optional 字段生成基本变量指针

对于 proto2 语法,如果基础字段添加了 optional 修饰,那么生成的 go 文件对应的字段是指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// proto 定义
syntax = "proto2";

message Foo {
optional int32 id = 1;
optional string bar = 2;
}

// go 文件
type Foo struct {
...

Id *int32 `protobuf:"varint,1,opt,name=id" json:"id,omitempty"`
Bar *string `protobuf:"bytes,2,opt,name=bar" json:"bar,omitempty"`
}

go 语法不能对常量、右值取地址,无法直接赋值给对应变量。proto 提供了对应的 wrapper 工具,传入右值,返回指针。

1
2
foo := pb.Foo{}
foo.Id = proto.Int32(23)

s3

递归删除

1
2
3
4
aws s3 rm --recursive s3://bucket/ --exclude="*" --include="/folder_path/*" 

# 递归删除 s3://bucket/offline/data/2022
aws s3 rm --recursive s3://bucket/offline/data --include="2022"

查看空间占用

1
2
3
$ aws s3 ls s3://bucket/patth --recursive --summarize --human-readable | grep Total
Total Objects: 8382
Total Size: 984.4 GiB

brew - ubuntu

安装

1
2
3
4
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"

# 配置
echo 'eval "$(/home/linuxbrew/.linuxbrew/bin/brew shellenv)"' >> /home/ubuntu/.profile

服务

consul

1
2
3
4
5
6
7
8
9
10
# 安装
brew install consul

# 配置
vim /home/linuxbrew/.linuxbrew/Cellar/consul/1.12.0/homebrew.consul.service
## 修改 ExecStart
ExecStart=/home/linuxbrew/.linuxbrew/opt/consul/bin/consul agent -dev -bind 0.0.0.0 -advertise 127.0.0.1 -client 0.0.0.0

# 手动启动
/home/linuxbrew/.linuxbrew/opt/consul/bin/consul agent -dev -bind 0.0.0.0 -advertise 127.0.0.1 -client 0.0.0.0

scala basic

[toc]

组织

文件

数据类型

类型

数字

字符串

布尔

限定

常量

变量

数据结构

数组

列表

集合

映射

语法

程序结构

注释

运算符

条件控制

循环

1
2
// 遍历次数
(0 until 10).map(...)

判断

函数

1
2
3
4
class Point(xc: Int, yc: Int) {
var x: Int = xc
var y: Int = yc
}

特性

语法糖

空指针处理

函数式编程

泛型编程

flatbuffer

定义

1
2
3
4
table Weapon {
name:string;
damage:short;
}

转换

1
2
3
4
flatbuffers::FlatBufferBuilder fbb;
...

const Weapon * weapon = GetWeapon(fbb.GetBufferPointer());

打印

1
2
3
4
5
6
7
// 打印结构
#include "flatbuffers/minireflect.h"

RequestT req;
flatbuffers::FlatBufferBuilder fbb;
fbb.Finish(Request::Pack(fbb, &req));
std::string debug_info = flatbuffers::FlatBufferToString(fbb.GetBufferPointer(), RequestTypeTable());

protobuf - c++

Descriptor & Reflection

message

1
2
3
4
5
6
7
8
9
10
syntax = "proto3";

message Sea {
string name = 1;
}

message World {
int64 age = 1;
repeated string tag = 2;
}

Descriptor & Reflection

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
World wd{};

// protobuf 3
auto age = World::GetDescriptor()->FindFieldByName("age");
// protobuf 2
auto age = wd.GetDescriptor()->FindFieldByName("age");

// 设置单值
World::GetReflection->SetInt64(&wd, age, 10);
// 设置 repeated 字段
World::GetReflection()->AddString(&wd, tag, "a");
World::GetReflection()->AddString(&wd, tag, "b");
World::GetReflection()->SetRepeatedString(&wd, tag, 1, "c");

// 设置 submessage
wd.mutable_sea()->set_name("pacific");

Json

1
2
3
4
5
6
7
8
9
10
#include <google/protobuf/util/json_util.h>

// message -> json
std::string output;
google::protobuf::util::MessageToJsonString(message, &output);

// json -> message
SomeMessage msg;
std::string input;
google::protobuf::util::MessageToJsonString(input, &msg);

cmake