spark之dataframe操作

作者: JerryHouse 分类: python, 数据分析 发布时间: 2017-08-01 18:07 ė 6spark之dataframe操作已关闭评论

spark dataframe

spark创建dataframe

data = sqlContext.createDataFrame([("Alberto", 2), ("Dakota", 2)], 
                                  ["Name", "askdaosdka"])
data.show()
data.printSchema()
 
# Output
#+-------+----------+
#|   Name|askdaosdka|
#+-------+----------+
#|Alberto|         2|
#| Dakota|         2|
#+-------+----------+
 
#root
# |-- Name: string (nullable = true)
# |-- askdaosdka: long (nullable = true)

spark创建dataframe

spark中dataframe新增列有多种方法,下面以pyspark为例进行说明如何将python中的list添加dataframe中。python中的list不能直接添加到dataframe中,需要先将list转为新的dataframe,然后新的dataframe和老的dataframe进行join操作, 下面的例子会先新建一个dataframe,然后将list转为dataframe,然后将两者join起来。

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("id", monotonically_increasing_id())
df.show()
+---+---+-----+---+
| x1| x2|   x3| id|
+---+---+-----+---+
|  1|  a| 23.0|  0|
|  3|  B|-23.0|  1|
+---+---+-----+---+
from pyspark.sql import Row
l = ['jerry', 'tom']
row = Row("pid", "name")
new_df = sc.parallelize([row(i, l[i]) for i in range(0,len(l))]).toDF()
new_df.show()
+---+-----+
|pid| name|
+---+-----+
|  0|jerry|
|  1|  tom|
+---+-----+
join_df = df.join(new_df, df.id==new_df.pid)
join_df.show()
+---+---+-----+---+---+-----+
| x1| x2|   x3| id|pid| name|
+---+---+-----+---+---+-----+
|  1|  a| 23.0|  0|  0|jerry|
|  3|  B|-23.0|  1|  1|  tom|
+---+---+-----+---+---+-----+

现在name list中的内容就加到了原来的dataframe中。

spark改变dataframe列名

spark改变dataframe列名有四种方法,下面以pyspark为例进行说明

1.使用selectExpr函数

data = sqlContext.createDataFrame([("Alberto", 2), ("Dakota", 2)], 
                                  ["Name", "askdaosdka"])
data.show()
data.printSchema()

# Output
#+-------+----------+
#|   Name|askdaosdka|
#+-------+----------+
#|Alberto|         2|
#| Dakota|         2|
#+-------+----------+

#root
# |-- Name: string (nullable = true)
# |-- askdaosdka: long (nullable = true)

df = data.selectExpr("Name as name", "askdaosdka as age")
df.show()
df.printSchema()

# Output
#+-------+---+
#|   name|age|
#+-------+---+
#|Alberto|  2|
#| Dakota|  2|
#+-------+---+

#root
# |-- name: string (nullable = true)
# |-- age: long (nullable = true)

2.使用withColumnRenamed函数

oldColumns = data.schema.names
newColumns = ["name", "age"]

df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), data)
df.printSchema()
df.show()

3.使用alias函数

from pyspark.sql.functions import *

data = data.select(col("Name").alias("name"), col("askdaosdka").alias("age"))
data.show()

# Output
#+-------+---+
#|   name|age|
#+-------+---+
#|Alberto|  2|
#| Dakota|  2|
#+-------+---+

4.使用sqlContext.sql函数

sqlContext.registerDataFrameAsTable(data, "myTable")
df2 = sqlContext.sql("SELECT Name AS name, askdaosdka as age from myTable")

df2.show()

# Output
#+-------+---+
#|   name|age|
#+-------+---+
#|Alberto|  2|
#| Dakota|  2|
#+-------+---+

本文出自 dcharm,转载时请注明出处及相应链接。

本文永久链接: http://www.dcharm.com/?p=677

Ɣ回顶部