apache-spark - 使用 pyspark 从 apache kafka 反序列化 avro 时为空列
问题描述
我正在使用 Kafka、Spark 和 jupyter 笔记本进行概念验证,但遇到了一个奇怪的问题。我试图读取从 kafka 到 pyspark 的 Avro 记录。我正在使用融合模式注册表来获取模式以反序列化 avro 消息。在对 spark 数据帧中的 avro 消息进行反序列化后,结果列是空的,没有任何错误。该列应该包含数据,因为当转换为字符串时,某些 avro 字段是可读的。
我也尝试在 Scala 中的 spark-shell 上执行此操作(没有 jupyter)我尝试了基于 docker 的 spark 以及独立安装的 spark
我按照这个 SO 主题来获取 from_avro 和 to_avro 函数: Pyspark 2.4.0, read avro from kafka with read stream - Python
jars = ["kafka-clients-2.0.0.jar", "spark-avro_2.11-2.4.3.jar", "spark-
sql-kafka-0-10_2.11-2.4.3.jar"]
jar_paths = ",".join(["/home/jovyan/work/jars/{}".format(jar) for jar in
jars])
conf = SparkConf()
conf.set("spark.jars", jar_paths)
spark_session = SparkSession \
.builder \
.config(conf=conf)\
.appName("TestStream") \
.getOrCreate()
def from_avro(col, jsonFormatSchema):
sc = SparkContext._active_spark_context
avro = sc._jvm.org.apache.spark.sql.avro
f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
return Column(f(_to_java_column(col), jsonFormatSchema))
def to_avro(col):
sc = SparkContext._active_spark_context
avro = sc._jvm.org.apache.spark.sql.avro
f = getattr(getattr(avro, "package$"), "MODULE$").to_avro
return Column(f(_to_java_column(col)))
schema_registry_url = "http://schema-registry.org"
transaction_schema_name = "Transaction"
transaction_schema = requests.get("
{}/subjects/{}/versions/latest/schema".format(schema_registry_url,
transaction_schema_name)).text
raw_df = spark_session.read.format("kafka") \
# SNIP
.option("subscribe", "transaction") \
.option("startingOffsets", "earliest").load()
raw_df = raw_df.limit(1000).cache()
extract_df = raw_df.select(
raw_df["key"].cast("String"),
from_avro(raw_df["value"], transaction_schema).alias("value")
)
# This shows data and fields
raw_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show(3, truncate=False)
extract_df.show()
值列的内容为空。我预计解码失败后会出现错误,或者数据会在那里。有谁知道可能导致这种情况的原因,或者如何调试它?
+---+-----+
|key|value|
+---+-----+
|...| [[]]|
|...| [[]]|
|...| [[]]|
|...| [[]]|
解决方案
您必须手动反序列化数据。截至撰写本文时,PySpark 尚未正式支持 Confluent 模式注册表。您需要使用 Confluent 或 ABRiS 提供的 KafkaAvroDeSerializer,这是一个 3rd-party Spark avro 库。
ABRiS:https ://github.com/AbsaOSS/ABRiS#using-abris-with-python-and-pyspark
KafkaAvroDeSerializer:将 Spark 结构化流与 Confluent Schema Registry 集成
原因:Confluent 在 Avro 数据 [Magic Byte|Schema ID|avro data] 旁边添加了 5 个额外字节,其中 1 个用于魔术字节,4 个用于模式 ID,而不是典型的 avro 格式。所以需要手动反序列化。
(对不起,我无法发表评论。)
推荐阅读
- google-apps-script - 我可以在不发送通知的情况下删除 Google 日历中的一系列活动吗?
- c - 创建两个具有相同大小和相同 md5 哈希的不同二进制文件
- javascript - 在反应js中的动态菜单栏上创建路由
- delphi - TTask 运行生命周期或当他应该仍然活跃时
- python - 我试图用 python 从亚马逊上刮价格,但我得到一个 AttributeError
- python - 管理员权限检查 (aiogram)
- go - 我可以将 Elasticsearch 与需要身份验证才能查看的数据一起使用吗(例如,仅限登录用户)
- python - 试图计算浮动列表的平均值?
- asp.net - 我想添加外键,但添加时出错
- c++ - C++ 仅读取进程范围内的内存地址