apache-spark - 计算 spark Dataframe 中的新列,将 df1 中的令牌列表列与 df2 中的文本列与 pyspark 交叉
问题描述
我正在使用 spark 2.4.5,我需要根据(西班牙语情感词典)中的单词从 的令牌列表列(MeaningfulWords
列)计算情感分数。在我必须创建一个新列,其中包含标记的分数列表和另一列,其中包含每条记录的分数平均值(分数总和/字数)。如果列表 ( ) 中的任何标记不在字典 ( ) 中,则得分为零。df1
df2
df1
df1
df2
数据框如下所示:
df1.select("ID","MeaningfulWords").show(truncate=True, n=5)
+------------------+------------------------------+
| ID| MeaningfulWords|
+------------------+------------------------------+
|abcde00000qMQ00001|[casa, alejado, buen, gusto...|
|abcde00000qMq00002|[clientes, contentos, servi...|
|abcde00000qMQ00003| [resto, bien]|
|abcde00000qMQ00004|[mal, servicio, no, antiend...|
|abcde00000qMq00005|[gestion, adecuada, proble ...|
+------------------+------------------------------+
df2.show(5)
+-----+----------+
|score| word|
+-----+----------+
| 1.68|abandonado|
| 3.18| abejas|
| 2.8| aborto|
| 2.46| abrasador|
| 8.13| abrazo|
+-----+----------+
要添加的新列df1
应如下所示:
+------------------+---------------------+
| MeanScore| ScoreList|
+------------------+---------------------+
| 2.95|[3.10, 2.50, 1.28,...|
| 2.15|[1.15, 3.50, 2.75,...|
| 2.75|[4.20, 1.00, 1.75,...|
| 3.25|[3.25, 2.50, 3.20,...|
| 3.15|[2.20, 3.10, 1.28,...|
+------------------+---------------------+
我已经使用 审查了一些选项.join
,但是使用具有不同数据类型的列会出错。我还尝试将 Dataframes 转换为 RDD 并调用一个函数:
def map_words_to_values(review_words, dict_df):
return [dict_df[word] for word in review_words if word in dict_df]
RDD1=swRemoved.rdd.map(list)
RDD2=Dict_df.rdd.map(list)
reviewsRDD_dict_values = RDD1.map(lambda tuple: (tuple[0], map_words_to_values(tuple[1], RDD2)))
reviewsRDD_dict_values.take(3)
但是使用这个选项我得到了错误:
PicklingError: Could not serialize object: Exception: It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
我找到了一些使用afinn
库对文本进行评分的示例。但它不适用于西班牙语文本。
如果可能的话,我想尝试利用 pyspark 的本机功能而不是使用 udfs 来避免影响性能。但我是火花的初学者,我想找到火花的方式来做到这一点。
解决方案
您可以通过首先使用array_contains
word 连接,然后使用、和.( )groupBy
的聚合来做到这一点first
collect_list
mean
spark2.4+
welcome to SO
df1.show()
#+------------------+----------------------------+
#|ID |MeaningfulWords |
#+------------------+----------------------------+
#|abcde00000qMQ00001|[casa, alejado, buen, gusto]|
#|abcde00000qMq00002|[clientes, contentos, servi]|
#|abcde00000qMQ00003|[resto, bien] |
#+------------------+----------------------------+
df2.show()
#+-----+---------+
#|score| word|
#+-----+---------+
#| 1.68| casa|
#| 2.8| alejado|
#| 1.03| buen|
#| 3.68| gusto|
#| 0.68| clientes|
#| 2.1|contentos|
#| 2.68| servi|
#| 1.18| resto|
#| 1.98| bien|
#+-----+---------+
from pyspark.sql import functions as F
df1.join(df2, F.expr("""array_contains(MeaningfulWords,word)"""),'left')\
.groupBy("ID").agg(F.first("MeaningfulWords").alias("MeaningfullWords")\
,F.collect_list("score").alias("ScoreList")\
,F.mean("score").alias("MeanScore"))\
.show(truncate=False)
#+------------------+----------------------------+-----------------------+------------------+
#|ID |MeaningfullWords |ScoreList |MeanScore |
#+------------------+----------------------------+-----------------------+------------------+
#|abcde00000qMQ00003|[resto, bien] |[1.18, 1.98] |1.58 |
#|abcde00000qMq00002|[clientes, contentos, servi]|[0.68, 2.1, 2.68] |1.8200000000000003|
#|abcde00000qMQ00001|[casa, alejado, buen, gusto]|[1.68, 2.8, 1.03, 3.68]|2.2975 |
#+------------------+----------------------------+-----------------------+------------------+
推荐阅读
- android - 为什么 viewBinding 使 XML 根全屏
- cloudinary - 如何使用扩展脚本将 Cloudinary 提供的图像下载到我的本地目录?
- python - 查找数据框 PANDAS 中每个日期的最新日期时间
- android - 不一致的 Android SQLite 寄存器
- python - 将“CSV”转换为“数组”,然后在 Python 中转换为“图像”?
- python - GitLab 中 feed_token 用于原子提要的目的
- excel - 如何修复 VBA Find 方法的“下标超出范围”错误?
- json - 空手道 - 比较 json 对象时出错
- python - 无法将 Rasa 机器人与 Flask 集成
- docker - 在 Windows 中使用 Docker Compose 挂载主机目录