首页 > 解决方案 > 无法在 Azure Databricks 上使用 UDF 解压缩流数据 - Python

问题描述

我正在尝试使用 Azure DataBricks 和 python (PySpark) 读取 Azure EventHub GZIP 压缩消息,但使用 UDF 不适用于 BinaryType 数据。

好吧,这是我检查身体里的东西的部分

df = eventHubStream.withColumn("body", eventHubStream["body"]).select("body")
display(df, truncate=False)

这会显示一个压缩良好的数据,如下所示:H4sIAKeM0FwC/3VS22rbQBB9z1cIQ6ElWN37JW8baeMKZEmRNk4LhcXUppg2cYncy...

但是,当我尝试将数据发送到我的 UDF 时,它的行为不像预期的那样。该函数实际上什么都不做,但输出看起来已经被转换了:

import zlib
from pyspark.sql.types import StringType

def streamDecompress(val: BinaryType()):
  #return zlib.decompress(val)
  return val

func_udf = udf(lambda x: streamDecompress(x), StringType())

df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select("body")
display(df, truncate=False)

这是输出:

[B@49d3f786

因此,正如预期的那样,当我尝试使用 zlib 解压缩时它失败了。

有人知道我是怎么做的吗?

标签: pythonpysparkbinaryazure-eventhubazure-databricks

解决方案


嗯,这比我想象的要简单得多。我基本上是在尝试显示类似字节的数据哈哈。

下面的代码解决了这个问题:

import zlib

def streamDecompress(val):   
  return str(zlib.decompress(val, 15+32))

func_udf = udf(lambda x: streamDecompress(x))

df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select('body')

display(df, truncate=False)

推荐阅读