常用操作
以文本形式保存 DataFrame 一列
1 | # dateframe |
处理 WrappedArray
1 | row.scores[0] |
Join 时重复行保留一个
1 | df_a.join(df_b, on=[{column}]) # 使用 on |
过滤
多条件过滤
1 | df.filter((condition1) & (condition2)) |
包含字符串
1 | df.filter(col('name').contains('sun')) |
值比较
1 | df.filter(col('name') == "wii") |
Null 判断
1 | df.filter(col('name').isNull()) |
In 判断
1 | df.filter(col('name').isin(["wii", "bovenson"])) |
创建列
重命名
1 | df_renamed = df.withColumnRenamed('name1', 'name2') |
从其他列新建列
数值计算
1 | new_df = df.withColumn('After_discount', df.Course_Fees - df.Discount) |
使用 UDF
1 | import pyspark.sql.functions as F |
计算
最大值
1 | df.agg(max("age")).show() |
转换
row to json string
1 | df.toJson() |
写数据
1 | df.write.format('orc').save('/path/to/destination') |
文本文件
1 | df.coalesce(1).write.format("text").option("header", "false").mode("overwrite").save('/path/to/destination') |
报错
1 | 代码 |
1 | 因为使用 from pyspark.sql.functions import * 导入,导致 abs 使用 from pyspark.sql.functions 内的函数 |
TypeError: Can not infer schema for type: <class ‘str’>
rdd.toDF()
时报错。
1 | from pyspark.sql import Row |
或者
1 | rdd.map(lambda x: (x, )).toDF() |