apache-spark - 使用 Pandas UDF 进行 Pyspark 流式传输
问题描述
我是 Spark Streaming 和 Pandas UDF 的新手。我正在研究来自 kafka 的 pyspark 消费者,有效负载是 xml 格式,并尝试通过应用 pandas udf 来解析传入的 xml
@pandas_udf("col1 string, col2 string",PandasUDFType.GROUPED_MAP)
def test_udf(df):
import xmltodict
from collections import MutableMapping
xml_str=df.iloc[0,0]
df_col=['col1', 'col2']
doc=xmltodict.parse(xml_str,dict_constructor=dict)
extract_needed_fields = { k:doc[k] for k in df_col }
return pd.DataFrame( [{'col1': 'abc', 'col2': 'def'}] , index=[0] , dtype="string" )
data=df.selectExpr("CAST(value AS STRING) AS value")
data.groupby("value").apply(test_udf).writeStream.format("console").start()
我收到以下错误
File "pyarrow/array.pxi", line 859, in pyarrow.lib.Array.from_pandas
File "pyarrow/array.pxi", line 215, in pyarrow.lib.array
File "pyarrow/array.pxi", line 104, in pyarrow.lib._handle_arrow_array_protocol
ValueError: Cannot specify a mask or a size when passing an object that is converted with the __arrow_array__ protocol.
这是正确的方法吗?我究竟做错了什么
解决方案
看起来,这似乎是一种比错误更多的无证限制。您不能使用任何将存储为数组对象的 pandas 类型,它有一个名为 的方法__arrow_array__
,因为pyspark 总是定义一个 mask。您使用的string
类型,存储在 StringArray 中,就是这种情况。在我将字符串 dtype 转换为对象后,错误就消失了。
推荐阅读
- python - 将更改的数据传递给另一个正在运行的代码
- html - 如何用html和css实现图片中的侧边栏外观
- jquery - 带有进度条多个实例的 Momentjs 不起作用
- ssl - 将 Power BI 连接到 BigQuery 问题 - SSL 验证失败
- c# - 错误:测试 azure 函数 c# .net 5.0 时无法加载 DLL 'libgmp-10.dll'
- swift - 返回字符串中变量的“位置”
- java - 当我尝试使用 Hibernate 连接 PostgreSQL 时,任何人都可以检查我出了什么问题
- python - 尝试构建 GCloud Run 容器映像失败
- scopes - OAuth2 会话 - 如何添加多个范围?
- python - 数据未存储在烧瓶 sqlAlchemy 中