首页 > 解决方案 > 使用 Pandas UDF 进行 Pyspark 流式传输

问题描述

我是 Spark Streaming 和 Pandas UDF 的新手。我正在研究来自 kafka 的 pyspark 消费者,有效负载是 xml 格式,并尝试通过应用 pandas udf 来解析传入的 xml

@pandas_udf("col1 string, col2 string",PandasUDFType.GROUPED_MAP)
def test_udf(df):
    import xmltodict
    from collections import MutableMapping 
    xml_str=df.iloc[0,0]
    df_col=['col1', 'col2']
    doc=xmltodict.parse(xml_str,dict_constructor=dict)
    extract_needed_fields = { k:doc[k] for k in df_col }
    return pd.DataFrame( [{'col1': 'abc', 'col2': 'def'}] , index=[0] , dtype="string" )

data=df.selectExpr("CAST(value AS STRING) AS value") 
data.groupby("value").apply(test_udf).writeStream.format("console").start()

我收到以下错误

  File "pyarrow/array.pxi", line 859, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 215, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 104, in pyarrow.lib._handle_arrow_array_protocol
ValueError: Cannot specify a mask or a size when passing an object that is converted with the __arrow_array__ protocol.


这是正确的方法吗?我究竟做错了什么

标签: apache-sparkpysparkstreamingpandas-groupby

解决方案


看起来,这似乎是一种比错误更多的无证限制。您不能使用任何将存储为数组对象的 pandas 类型,它有一个名为 的方法__arrow_array__,因为pyspark 总是定义一个 mask。您使用的string 类型,存储在 StringArray 中,就是这种情况。在我将字符串 dtype 转换为对象后,错误就消失了。


推荐阅读