首页 > 解决方案 > 将 spark sql 2.4.4 数据帧中的 Avro 类型消息生成到 Kafka

问题描述

我正在尝试使用 spark SQL 将 Avro 消息写入 Kafka。有人可以建议我如何在java中实现它吗?我找到了一个 scala 参考代码,但没有找到 Java。

我试过但抛出错误,我在哪里可以配置模式注册表。

aggr.selectExpr("CAST(order_id AS String) AS key", "to_avro(struct(*)) AS value").write().format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "aggr_topic").save();

或者请将 scala 代码复制到 java 。

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaURL)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryURL).as("key"),
    from_avro($"value", "t-value", schemaRegistryURL).as("value"))

提前致谢。

标签: javaapache-sparkapache-kafkaspark-structured-streamingconfluent-schema-registry

解决方案


该代码在Java中完全相同,除了val df

from_avro顺便说一句,只存在于databricks环境中,无论如何,你想要writeStreamand to_avro

另一种方法是使用foreachPartition将dataframe转换为RDD,然后手动创建一个新的KafkaProducer来发送事件

您可能还对https://github.com/AbsaOSS/ABRiS感兴趣


推荐阅读