首页 > 解决方案 > PySpark Python使用列对数据框进行排序

问题描述

所以我有 2 个问题,我认为这对于有 PySpark 经验的人来说应该是基本的,但我似乎无法解决它们。

csv我文件中的示例条目是-

"dfg.AAIXpWU4Q","1"
"cvbc.AAU3aXfQ","1"
"T-L5aw0L1uT_OfFyzbk","1"
"D9TOXY7rA_LsnvwQa-awVk","2"
"JWg8_0lGDA7OCwWcH_9aDc","2"
"ewrq.AAbRaACr2tVh5wA","1"
"ewrq.AALJWAAC-Qku3heg","1"
"ewrq.AADStQqmhJ7A","2"
"ewrq.AAEAABh36oHUNA","1"
"ewrq.AALJABfV5u-7Yg","1"

我创建以下数据框-

>>> df2.show(3)
+-------+----+
|user_id|hits|
+-------+----+
|"aYk...| "7"|
|"yDQ...| "1"|
|"qUU...|"13"|
+-------+----+
only showing top 3 rows

首先,这是将hitscolumn转换为的正确方法IntegerType()吗?为什么所有的价值观都变成了null

>>> df2 = df2.withColumn("hits", df2["hits"].cast(IntegerType()))
>>> df2.show(3)
+-------+----+
|user_id|hits|
+-------+----+
|"aYk...|null|
|"yDQ...|null|
|"qUU...|null|
+-------+----+
only showing top 3 rows

其次,我需要按列的降序对该列表进行排序hits。所以,我尝试了这个-

>>> df1 = df2.sort(col('hits').desc())
>>> df1.show(20)

但我收到以下错误 -

java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 2 fields are required while 18 values are provided.

我猜这是因为我创建我的数据框使用 -

>>> rdd = sc.textFile("/path/to/file/*")
>>> rdd.take(2)
['"7wAfdgdfgd","7"', '"1x3Qdfgdf","1"']
​
>>> my_df = rdd.map(lambda x: (x.split(","))).toDF()

>>> df2 = my_df.selectExpr("_1 as user_id", "_2 as hits")
>>> df2.show(3)
+-------+----+
|user_id|hits|
+-------+----+
|"aYk...| "7"|
|"yDQ...| "1"|
|"qUU...|"13"|
+-------+----+
only showing top 3 rows

而且我猜有些行中有额外的逗号。我该如何避免这种情况 - 或者阅读此文件的最佳方式是什么?

标签: pythonapache-sparkpysparkregexp-replace

解决方案


更新

-- 增加文件读取和拆分

看上面的例子,创建了一个这样的文件

'"7wAfdgdfgd","7"'
'"1x3Qdfgdf","1"'
'"13xxyyzzsdff","13"'

--请注意将'所有行作为单个字符串现在读取它的代码:

scala> val myRdd = sc.textFile("test_file.dat")
myRdd: org.apache.spark.rdd.RDD[String] = test_file.dat MapPartitionsRDD[1] at textFile at <console>:24
// please check the type of RDD , here it is string
// We need to have Iterable[tuple(String,String)] to convert it into Dataframe

scala> myRdd.map(x => x.replace("'","")).map(x => x.split(",")).map( x => (x(0),x(1)))
res0: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[4] at map at <console>:26

// Finally
    scala> myRdd.map(x => x.replace("'","")).map(x => x.split(",")).map( x => (x(0),x(1))).toDF("user_id","hits").show(false)
+--------------+----+
|user_id       |hits|
+--------------+----+
|"7wAfdgdfgd"  |"7" |
|"1x3Qdfgdf"   |"1" |
|"13xxyyzzsdff"|"13"|
+--------------+----+

更新结束

因为你是新人(或其他人),我建议/练习运行实际ANSI sql而不是pyspark.sql.functions. 它易于维护+使用sql.functionsover没有任何优势ansi sql。显然,您需要知道我使用的 spark 提供的 sql/columns 函数splitorderBycast回答。由于您没有提供 的内容text file,因此这是我的看法以及一个 SQL 中的所有3 个答案

    myDf = spark.createDataFrame([("abc","7"),("xyz","18"),("lmn","4,xyz")],schema=["user_id","hits"])
myDf.show(20,False)
+-------+-----+
|user_id|hits |
+-------+-----+
|abc    |7    |
|xyz    |18   |
|lmn    |4,xyz|
+-------+-----+

myDf.createOrReplaceTempView("hits_table")

SQL + 结果

    spark.sql("select user_id, cast(split(hits,',')[0] as integer) as hits from hits_table order by hits desc ").show(20,False)
    +-------+----+
    |user_id|hits|
    +-------+----+
    |xyz    |18  |
    |abc    |7   |
    |lmn    |4   |
    +-------+----+

推荐阅读