scala - Scala:从 Spark 结构化流中读取 Kafka Avro 消息时出错
问题描述
我一直在尝试使用 Scala 2.11 从 spark 结构化流 (2.4.4) 中读取 Kafka 的 avro 序列化消息。为此,我使用了 spark-avro(下面的依赖项)。我使用 confluent-kafka 库从 python 生成 kafka 消息。Spark 流能够使用模式使用消息,但它不能正确读取字段的值。我准备了一个简单的例子来说明问题,代码在这里可用: https ://github.com/anigmo97/SimpleExamples/tree/master/Spark_streaming_kafka_avro_scala
我在 python 中创建记录,记录的架构是:
{
"type": "record",
"namespace": "example",
"name": "RawRecord",
"fields": [
{"name": "int_field","type": "int"},
{"name": "string_field","type": "string"}
]
}
它们是这样生成的:
from time import sleep
from confluent_kafka.avro import AvroProducer, load, loads
def generate_records():
avro_producer_settings = {
'bootstrap.servers': "localhost:19092",
'group.id': 'groupid',
'schema.registry.url': "http://127.0.0.1:8081"
}
producer = AvroProducer(avro_producer_settings)
key_schema = loads('"string"')
value_schema = load("schema.avsc")
i = 1
while True:
row = {"int_field": int(i), "string_field": str(i)}
producer.produce(topic="avro_topic", key="key-{}".format(i),
value=row, key_schema=key_schema, value_schema=value_schema)
print(row)
sleep(1)
i+=1
spark 结构化流(在 Scala 中)的消耗是这样完成的:
import org.apache.spark.sql.{ Dataset, Row}
import org.apache.spark.sql.streaming.{ OutputMode, StreamingQuery}
import org.apache.spark.sql.avro._
...
try {
log.info("----- reading schema")
val jsonFormatSchema = new String(Files.readAllBytes(
Paths.get("./src/main/resources/schema.avsc")))
val ds:Dataset[Row] = sparkSession
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", topic)
.load()
val output:Dataset[Row] = ds
.select(from_avro(ds.col("value"), jsonFormatSchema) as "record")
.select("record.*")
output.printSchema()
var query: StreamingQuery = output.writeStream.format("console")
.option("truncate", "false").outputMode(OutputMode.Append()).start();
query.awaitTermination();
} catch {
case e: Exception => log.error("onApplicationEvent error: ", e)
//case _: Throwable => log.error("onApplicationEvent error:")
}
...
在 spark 中打印模式,奇怪的是这些字段可以为空,尽管 avro 模式不允许这样做。Spark 显示了这一点:
root
|-- int_field: integer (nullable = true)
|-- string_field: string (nullable = true)
我已经在 python 中与另一个消费者检查了消息,消息很好,但与消息内容无关,spark 显示了这一点。
+---------+------------+
|int_field|string_field|
+---------+------------+
|0 | |
+---------+------------+
使用的主要依赖项是:
<properties>
<spark.version>2.4.4</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
有谁知道为什么会发生这种情况?
提前致谢。重现错误的代码在这里:
https://github.com/anigmo97/SimpleExamples/tree/master/Spark_streaming_kafka_avro_scala
解决方案
问题是我在 python 中使用 confluent_kafka 库,并且我正在使用 spark-avro 库在 spark 结构化流中读取 avro 消息。
Confluent_kafka 库使用 confluent 的 avro 格式并使用标准 avro 格式触发 avro 读取。
不同之处在于,为了使用模式注册表,confluent avro 在消息前添加了四个字节,指示应该使用哪个模式。
为了能够使用 confluent avro 并从 spark 结构化流中读取它,我为 Abris 替换了 spark-avro 库(abris 允许将 avro 和 confluent avro 与 spark 集成)。 https://github.com/AbsaOSS/ABRiS
解决方案
解决方案
问题是我在 python 中使用 confluent_kafka 库,并且我正在使用 spark-avro 库在 spark 结构化流中读取 avro 消息。
Confluent_kafka 库使用 confluent 的 avro 格式并使用标准 avro 格式触发 avro 读取。
不同之处在于,为了使用模式注册表,confluent avro 在消息前添加了四个字节,指示应该使用哪个模式。
为了能够使用 confluent avro 并从 spark 结构化流中读取它,我为 Abris 替换了 spark-avro 库(abris 允许将 avro 和 confluent avro 与 spark 集成)。 https://github.com/AbsaOSS/ABRiS
我的依赖项更改如下:
<properties>
<spark.version>2.4.4</spark.version>
<scala.version>2.11</scala.version>
</properties>
<!-- SPARK- AVRO -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- SPARK -AVRO AND CONFLUENT-AVRO -->
<dependency>
<groupId>za.co.absa</groupId>
<artifactId>abris_2.11</artifactId>
<version>3.1.1</version>
</dependency>
在这里您可以看到一个简单的示例,该示例获取消息并将其值反序列化为 avro 和 confluent avro。
var input: Dataset[Row] = sparkSession.readStream
//.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
.format("kafka")
.option("kafka.bootstrap.servers", kafkaServers)
.option("subscribe", topicConsumer)
.option("failOnDataLoss", "false")
// .option("startingOffsets", "latest")
// .option("startingOffsets", "earliest")
.load();
// READ WITH spark-avro library (standard avro)
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./src/main/resources/schema.avsc")))
var inputAvroDeserialized: Dataset[Row] = input
.select(from_avro(functions.col("value"), jsonFormatSchema) as "record")
.select("record.*")
//READ WITH Abris library (confuent avro)
val schemaRegistryConfig = Map(
SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://localhost:8081",
SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> topicConsumer,
SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> SchemaManager.SchemaStorageNamingStrategies.TOPIC_NAME, // choose a subject name strategy
SchemaManager.PARAM_VALUE_SCHEMA_ID -> "latest" // set to "latest" if you want the latest schema version to used
)
var inputConfluentAvroDeserialized: Dataset[Row] = inputConfluentAvroSerialized
.select(from_confluent_avro(functions.col("value"), schemaRegistryConfig) as "record")
.select("record.*")
推荐阅读
- biml - MAX 值的二进制类型 -1 列
- abap - 将字段描述添加到 FBL5N 标头
- c# - 除了 /c net user " + user + " " + newPass; 通过 C# 之外,还有其他方法可以更改 Windows 本地用户密码吗
- python - python3中数组中所有元素的总和
- wordpress - 如何从所有供应商处获取产品
- powershell - 使用 powershell 从 AD 中提取特定的代理地址
- selenium - Internet Explorer 的 Selenium 测试无法通过 Jenkins 正确执行
- node.js - 如何在 Node.js 中使用 POST 响应的值?
- python - 文本到图像总是灰色的
- mongodb - MongoDB数据存储方式:哪一种更高效?