python - 无法在 Azure Databricks 上使用 UDF 解压缩流数据 - Python
问题描述
我正在尝试使用 Azure DataBricks 和 python (PySpark) 读取 Azure EventHub GZIP 压缩消息,但使用 UDF 不适用于 BinaryType 数据。
好吧,这是我检查身体里的东西的部分
df = eventHubStream.withColumn("body", eventHubStream["body"]).select("body")
display(df, truncate=False)
这会显示一个压缩良好的数据,如下所示:H4sIAKeM0FwC/3VS22rbQBB9z1cIQ6ElWN37JW8baeMKZEmRNk4LhcXUppg2cYncy...
但是,当我尝试将数据发送到我的 UDF 时,它的行为不像预期的那样。该函数实际上什么都不做,但输出看起来已经被转换了:
import zlib
from pyspark.sql.types import StringType
def streamDecompress(val: BinaryType()):
#return zlib.decompress(val)
return val
func_udf = udf(lambda x: streamDecompress(x), StringType())
df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select("body")
display(df, truncate=False)
这是输出:
[B@49d3f786
因此,正如预期的那样,当我尝试使用 zlib 解压缩时它失败了。
有人知道我是怎么做的吗?
解决方案
嗯,这比我想象的要简单得多。我基本上是在尝试显示类似字节的数据哈哈。
下面的代码解决了这个问题:
import zlib
def streamDecompress(val):
return str(zlib.decompress(val, 15+32))
func_udf = udf(lambda x: streamDecompress(x))
df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select('body')
display(df, truncate=False)
推荐阅读
- google-apps-script - 获取 Google 表格以仅在最新行上执行脚本
- ios - 连接时如何检测 AirPods?
- java - envoy Istio 适合微服务应用程序的地方
- python - 当上传者为“conda-forge”时,更新 conda-forge 中的包
- python - 无法将使用 nuitka 创建的 dist 文件夹复制到任何其他位置
- vba - VBA 将活动电子邮件保存到收件箱中的子文件夹
- python - 动画不更新
- qt - 如何在图表的 X 轴上显示程序运行时间(时间)?
- omnet++ - 找不到 RoutingAccessTable.h?
- ios - 滚动视图内的内容视图 - 为什么?