python - Spark:如何通过 mapInPandas 正确转换数据帧
问题描述
我正在尝试通过最新的 spark 3.0.1 函数mapInPandas转换具有 10k 行的 spark 数据帧。
预期输出:映射的 pandas_function() 将一行转换为三行,因此输出 transform_df 应该有 30k 行
当前输出:我得到 3 行 1 核和 24 行 8 核。
输入:respond_sdf 有 10k 行
+-----+-------------------------------------------------------------------+
|url |content |
+-----+-------------------------------------------------------------------+
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
|api_1|{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } |
|api_2|{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }|
+-----+-------------------------------------------------------------------+
only showing top 20 rows
Input respond_sdf has 10000 rows
输出 A) 3 行 - 有 1 个核心 - .master('local [ 1 ]')
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } (0 + 1) / 1]
+-----+---+---+
| api| A| B|
+-----+---+---+
|api_1| 1| 4|
|api_1| 2| 5|
|api_1| 3| 6|
+-----+---+---+
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
Output transformed_df has 3 rows
输出 B) 24 行 - 8 核 - .master('local[8]')
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } (0 + 1) / 1]
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
+-----+---+---+
| api| A| B|
+-----+---+---+
|api_1| 1| 4|
|api_1| 2| 5|
|api_1| 3| 6|
|api_1| 1| 4|
|api_1| 2| 5|
|api_1| 3| 6|
|api_1| 1| 4|
|api_1| 2| 5|
|api_1| 3| 6|
|api_1| 1| 4|
|api_1| 2| 5|
|api_1| 3| 6|
|api_1| 1| 4|
|api_1| 2| 5|
|api_1| 3| 6|
|api_1| 1| 4|
|api_1| 2| 5|
|api_1| 3| 6|
|api_1| 1| 4|
|api_1| 2| 5|
+-----+---+---+
only showing top 20 rows
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] } (3 + 5) / 8]
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }
Output transformed_df has 24 rows
示例代码:
#### IMPORT PYSPARK ###
import pandas as pd
import pyspark
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
spark = pyspark.sql.SparkSession.builder.appName("test") \
.master('local[1]') \
.getOrCreate()
sc = spark.sparkContext
####### INPUT DATAFRAME WITH LIST OF JSONS ########################
# Create list with 10k nested tuples(url,content)
rdd_list = [('api_1',"{'api': ['api_1', 'api_1', 'api_1'],'A': [1,2,3], 'B': [4,5,6] }"),
(' api_2', "{'api': ['api_2', 'api_2', 'api_2'],'A': [7,8,9], 'B': [10,11,12] }")]*5000
schema = StructType([
StructField('url', StringType(), True),
StructField('content', StringType(), True)
])
#Create input dataframe with 10k rows
jsons = sc.parallelize(rdd_list)
respond_sdf = spark.createDataFrame(jsons, schema)
respond_sdf.show(truncate=False)
print(f'Input respond_sdf has {respond_sdf.count()} rows')
####### TRANSFORMATION DATAFRAME ########################
# Pandas transformation function returning pandas dataframe
def pandas_function(iter):
for df in iter:
print(df['content'][0])
yield pd.DataFrame(eval(df['content'][0]))
transformed_df = respond_sdf.mapInPandas(pandas_function, "api string, A int, B int")
transformed_df.show()
print(f' Output transformed_df has {transformed_df.count()} rows')
print(f'Expected output dataframe should has 30k rows')
链接到相关讨论: How to yield pandas dataframe rows to spark dataframe
解决方案
抱歉,在我对您上一个问题的回答中,使用的部分mapInPandas
不正确。我认为下面这个函数是编写 pandas 函数的正确方法。上次我犯了一个错误,因为我之前认为iter
是一个可迭代的行,但它实际上是一个可迭代的数据帧。
def pandas_function(iter):
for df in iter:
yield pd.concat(pd.DataFrame(x) for x in df['content'].map(eval))
(PS感谢从这里回答。)
推荐阅读
- python - 在 Python 中无需显式变量声明即可跟踪程序的所有可能状态
- java - 使用 sessionFactory.openSession() 但没有使用 sessionfactory.getCurrentSession() 时没有活动事务异常
- java - 我想将 PDF 转换为图像,但我只想要包含所有图像和矢量图形的单个输出图像。我不想要文字
- android - 如何在项目android studio之间同步对gradle文件所做的所有更改
- kubernetes - ansible 验证 k8s 部署成功
- javascript - 如何将javascript子字符串输出一页传递到另一页?
- charts - 如何在谷歌折线图中的标签和水平轴之间添加空格
- go - 如何对具有共同字段的空接口切片进行排序?
- python - 对 PC 进行套接字编程的两种 Raspi4 不同响应
- d3.js - d3.js 更新 x 域不会移动图形