java - 将 spark sql 2.4.4 数据帧中的 Avro 类型消息生成到 Kafka
问题描述
我正在尝试使用 spark SQL 将 Avro 消息写入 Kafka。有人可以建议我如何在java中实现它吗?我找到了一个 scala 参考代码,但没有找到 Java。
我试过但抛出错误,我在哪里可以配置模式注册表。
aggr.selectExpr("CAST(order_id AS String) AS key", "to_avro(struct(*)) AS value").write().format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "aggr_topic").save();
或者请将 scala 代码复制到 java 。
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafkaURL)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", "t-key", schemaRegistryURL).as("key"),
from_avro($"value", "t-value", schemaRegistryURL).as("value"))
提前致谢。
解决方案
该代码在Java中完全相同,除了val df
from_avro
顺便说一句,只存在于databricks环境中,无论如何,你想要writeStream
and to_avro
。
另一种方法是使用foreachPartition将dataframe转换为RDD,然后手动创建一个新的KafkaProducer来发送事件
推荐阅读
- python - TypeError:最多输入 1 个参数,得到 3 个。如何解决这个问题?
- python - 新手,从 url 抓取表格,无法在 python 命令提示符中获取输出
- c# - Xamarin Forms Picker Binding 在 Itemssource 更改时中断
- networking - linux网络加速,硬件卸载
- python - 如何在 python 中输入小时费率和总收入?
- vue.js - Vue自定义指令不适用于模板标签
- powershell - 参数作为变量,而不是直接将文件路径传递给 PowerShell cmdlet
- css - 未应用 CSS 媒体查询规则
- botframework - 在频道和群聊中以静默方式在团队中安装应用程序
- swift - 如何将图像加载到操场上