pyspark
常用操作
以文本形式保存 DataFrame 一列
1 | # dateframe |
处理 WrappedArray
1 | row.scores[0] |
Join 时重复行保留一个
1 | df_a.join(df_b, on=[{column}]) # 使用 on |
过滤
多条件过滤
1 | df.filter((condition1) & (condition2)) |
包含字符串
1 | df.filter(col('name').contains('sun')) |
值比较
1 | df.filter(col('name') == "wii") |
Null 判断
1 | df.filter(col('name').isNull()) |
In 判断
1 | df.filter(col('name').isin(["wii", "bovenson"])) |
创建列
重命名
1 | df_renamed = df.withColumnRenamed('name1', 'name2') |
从其他列新建列
数值计算
1 | new_df = df.withColumn('After_discount', df.Course_Fees - df.Discount) |
使用 UDF
1 | import pyspark.sql.functions as F |
计算
最大值
1 | df.agg(max("age")).show() |
转换
row to json string
1 | df.toJson() |
写数据
1 | df.write.format('orc').save('/path/to/destination') |
文本文件
1 | df.coalesce(1).write.format("text").option("header", "false").mode("overwrite").save('/path/to/destination') |
报错
1 | 代码 |
1 | 因为使用 from pyspark.sql.functions import * 导入,导致 abs 使用 from pyspark.sql.functions 内的函数 |
TypeError: Can not infer schema for type: <class ‘str’>
rdd.toDF()
时报错。
1 | from pyspark.sql import Row |
或者
1 | rdd.map(lambda x: (x, )).toDF() |
spark
读 Json
1 | var srcData = spark.read.json(srcPath) |
设置大小写敏感
1 | spark.conf.set("spark.sql.caseSensitive", "true") |
查询
group by
1 | df.groupBy("column-name").count().show(false) |
读 S3 数据
pyspark
安装
1 | pip3 install pyspark |
读取 s3 数据
1 | # 创建 spark session |
scala
1 | // 创建 Spark Session |
pom
1 | <dependency> |
DataFrame 操作
合并 DataFrame
1 | # 按行拼接 (列数不变) |
按行拼接
1 | %pyspark # zeppelin |
列操作
计算分布
1 | ndf = df.groupBy('age').count() |
创建 DataFrame
1 | # schema 1 |
hadoop startup
[toc]
搭建
节点
1 | 这里有三个节点, dp1 dp2 dp3 |
将节点信息在各节点的 /etc/hosts
内配置。
安装 jdk
1 | centos 8 |
配置免密登录
1 | ssh-keygen -t rsa -c "email" # 或者直接 ssh-keygen |
下载 hadoop
1 | wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.3/hadoop-3.3.3.tar.gz |
添加环境变量
1 | sudo vim /etc/profile |
修改 hadoop 配置
1 | cd ${HADOOP_HOME}/etc/hadoop |
1 | export JAVA_HOME=/usr/lib/jvm/java-11-openjdk # openjdk |
core-site.xml
1 | <configuration> |
hdfs-site.xml
1 | <property> |
yarn-site.xml
1 | <configuration> |
mapred-site.xml
1 | <configuration> |
workers
1 | dp1 |
分发 hadoop
1 | 需要在 dp2、dp3 修改 /usr/local/ 目录的写入权限 |
初始化
1 | 在 dp1 上执行 |
启动集群
1 | ${HADOOP_HOME}/sbin 目录下 |
查看集群
1 | jps |
访问
端口 | 说明 |
---|---|
8088 | 资源管理界面 |
8020 | name node rpc 端口 |
9870 | name node http 端口 |
9864 | data node http 端口 |
停掉集群
1 | sbin 下 |
配置自启动
1 | crontab -e |
Client
如果需要从本地机器向远程的 hadoop 集群传输文件,可以再从线上下载一份 hadoop 程序(保持配置),然后做下面的更改。
- 配置好 PATH
- 修改
hadoop-env.sh
里面的 JAVA_HOME /etc/hosts
添加对应的主机记录
这里需要注意的是,hadoop 集群可能是用的内网 ip,比如我搭建的 hadoop 集群在 openstack 上面,ip 用的是 10.1.0.x
。如果是连接在外部网络的机器(192.168.6.x),是不能直接访问的。
1 | 本地机器 ip |
如果直接使用下面命令。
1 | hdfs dfs -copyFromLocal /local/path/to/file /remote/path/to/save |
会报下面的错误。
1 | ➜ ~ hdfs dfs -copyFromLocal /Users/wii/heap /tmp/ |
可以看到,会直接访问 10.1.0.191 ,显然是有问题的,因为外部网络没办法直接访问 openstack 的 internal network,解决方案是设置一下路由,下面是 openstack 的网络拓扑。

内部网络是通过一个路由(192.168.6.129)和外部网络连接,下面是在本地添加静态路由的代码。
1 | sudo route add -net 10.1.0.0 -netmask 255.255.0.0 192.168.6.129 |
也可以在路由器配置。

再去 ping 就可以了。
1 | ➜ ~ ping 10.1.0.191 |
还有一个问题就是,hadoop 集群不要使用外部网络 ip,这样在传输数据的时候都要走一遍外部网络,效率极低。
权限
关闭权限控制
在内网搭建的集群,为了方便把权限关掉了。
1 | <!-- hdfs-site.xml --> |
使用 docker compose 安装
docker hub 文档参考这里,docker compose 配置参考这里 。
配置 Client
1 | 从容器拷贝 hadoop 程序 |
注意:提前配置 JAVA_HOME
测试
1 | hdfs dfs -ls / |
关闭权限控制
环境变量修改。
1 | HDFS-SITE.XML_dfs.permissions.enabled=false |
配置文件修改。
1 | <!-- hdfs-site.xml --> |
MR 作业
配置
作业提交 / 环境变量配置集群属性等。
1 | 代码中获取配置 |
Job
1 | setJarByClass |
文件操作
1 | 获取 FileSystem |
HDFS
1 | 创建目录 |
instance init
挂载卷
1 | 创建文件系统 |
c++ 复杂对象
说明
这是一个小实验。
定义一个复杂类
1 | struct Complex { |
性能对比
作为实验,我们把负载类的对象插入一个 Map。在开始性能对比之前,定义计算时间的工具函数。
1 | std::chrono::time_point<std::chrono::system_clock> now() { |
std::map
1 | template<typename K, typename V> |
输出
1 | elapsed: 7750 |
std::unordered_map
1 | template<typename K, typename V> |
输出
1 | elapsed: 4474 |
std::shared_ptr
如果我们改为使用共享指针。
1 | template<typename K, typename V> |
输出
1 | elapsed: 2002 |
简单对象
再来对比下指针和简单对象。
智能指针
1 | template<typename K, typename V> |
输出
1 | elapsed: 2012 |
简单对象
1 | template<typename K, typename V> |
输出
1 | elapsed: 1763 |
std::move
std::move 实际会调用 move constructor(A(A &&another)),所以,对于基础类型和没有实现 move constructor 的类不起效果。
定义 A。
1 | class A { |
看下插入 map。
1 | std::map<int, A> ma; |
用如下方式。
1 | std::map<int, A> ma; |
实现移动构造。
1 | class A { |
申请内存
先来看下空转。
1 | int loop = 1000 * 10000; |
再来看下申请内存。
1 | int loop = 1000 * 10000; |
再来看下插入 vector。
1 | int loop = 1000 * 10000; |
再看下 reserve size。
1 | int loop = 1000 * 10000; |
再看下通过构造函数初始化列表。
1 | int loop = 1000 * 10000; |
再来看下 resize。
1 | int loop = 1000 * 10000; |
小结
在深度理解程序的过程中,很多操作的实际成本和我们想象中的不太一样,用一句话直白的说明就是,“这个操作竟然这么耗时?”,或者,“这个操作这么快?”。
操作
- 计算
- 内存申请
- 容器
- map,插入、查找
protobuf - go
Tips
optional 字段生成基本变量指针
对于 proto2 语法,如果基础字段添加了 optional 修饰,那么生成的 go 文件对应的字段是指针。
1 | // proto 定义 |
go 语法不能对常量、右值取地址,无法直接赋值给对应变量。proto 提供了对应的 wrapper 工具,传入右值,返回指针。
1 | foo := pb.Foo{} |
s3
递归删除
1 | aws s3 rm --recursive s3://bucket/ --exclude="*" --include="/folder_path/*" |
查看空间占用
1 | aws s3 ls s3://bucket/patth --recursive --summarize --human-readable | grep Total |