python - pyspark 分组映射 IllegalArgumentException 错误
问题描述
我无法让 GROUPED_MAP 在 pyspark 中工作。我尝试使用示例代码,包括来自 spark git repo 的一些代码,但没有成功。任何关于我需要改变的建议都表示赞赏。
例如:
from pyspark.sql import SparkSession
from pyspark.sql.utils import require_minimum_pandas_version, require_minimum_pyarrow_version
require_minimum_pandas_version()
require_minimum_pyarrow_version()
from pyspark.sql.functions import pandas_udf, PandasUDFType
spark = SparkSession.builder.master("local[*]").getOrCreate()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").apply(subtract_mean).show()
给我错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o61.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 44 in stage 7.0 failed 1 times, most recent failure: Lost task 44.0 in stage 7.0 (TID 128, localhost, executor driver): java.lang.IllegalArgumentException
我相信 pyspark 设置正确,因为这对我来说成功运行:
from pyspark.sql.functions import udf, struct, col
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import pyspark.sql.functions as func
import pandas as pd
spark = SparkSession.builder.master("local[*]").getOrCreate()
def sum_diff(f1, f2):
return [f1 + f2, f1-f2]
schema = StructType([
StructField("sum", FloatType(), False),
StructField("diff", FloatType(), False)
])
sum_diff_udf = udf(lambda row: sum_diff(row[0], row[1]), schema)
df = spark.createDataFrame(pd.DataFrame([[1., 2.], [2., 4.]], columns=['f1', 'f2']))
df_new = df.withColumn("sum_diff", sum_diff_udf(struct([col('f1'), col('f2')])))\
.select('*', 'sum_diff.*')
df_new.show()
解决方案
我遇到过同样的问题。对我来说,它是通过使用推荐版本的 PyArrow (0.15.1) 并在conf/spark-env.sh
我使用 Spark 2.4.x 时设置一个环境变量以实现向后兼容性来解决的:
ARROW_PRE_0_15_IPC_FORMAT=1
在此处查看完整说明。请注意,对于 Windows,您需要重命名conf/spark-env.sh
为,conf/spark-env.cmd
因为它不会获取 bash 脚本。在这种情况下,环境变量是:
set ARROW_PRE_0_15_IPC_FORMAT=1
推荐阅读
- ethereum - 如何检查 Truffle 支持的最新 Solidity 版本
- azure - Blob 存储 GZRS 复制的 DR 测试
- javascript - Puppeteer 写入某些选项卡的 pdf 的问题
- react-native - 如何使多边形交互反应原生?
- javascript - OOP Three.JS 不起作用,控制台中未调用某些元素
- c++ - 尝试将二叉树保存为向量会出现内存问题(C++)
- javascript - JS页面上的点击按钮-Python
- git - 为什么我不能使用 git pull 来自 github resprority 的 master 分支?
- html - 我的标题菜单中的问题,2个菜单重叠,我不明白为什么
- python - 我有 JSONField 使用一个库来为 sql postgres 调用这个对象 如何使用 sqlite 相同的库