scala - 将 DataFrame 发布到 Kafka
问题描述
我遇到了一个非常琐碎的问题,但目前我找不到解决方案。
假设我有一个 spark DataFrame,它可能是无类型的或强类型的,这并不重要。
现在我想将它发布到 Kafka,下面的代码效果很好:
df2.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
.write.format("kafka")
.option("kafka.bootstrap.servers", "host:9092")
.option("topic", "test").save()
但是,我希望使用更复杂的消毒器发布——在我的例子中是定制的。
我该怎么做?换句话说,我希望发布一个对象,而不是发布字符串。
我的数据源是 Vertica,我正在使用Vertica 连接器来使用事件。
解决方案
您可以使用foreachPartition以自定义方式将数据发布到外部源。这样,如果您使用 foreach,您将只为每个分区创建一个连接,而不是记录。
推荐阅读
- java - 如何在 Spring Boot 中测试 EJB - openejb 由于模块信息而无法处理多版本 JAR
- sql - 聚集索引扫描和索引扫描的区别
- python - Python - 替换 Excel 表格中的值
- python - Python - 为什么我的 for 循环跳过行并且没有到达文件末尾?
- python-2.7 - 执行命令期间的 pip python-geohash 问题
- flask - 设置 Access-Control-Allow-Origin 后,为什么会出现 CORS 错误?
- java - 如何在 Quarkus 中为外部模块中的类创建 Jandex 索引
- linq - 如何在 C# 中对 linq 进行多列分组
- c++ - 如何正确地将 MFC 链接控件添加到对话框?
- ios - 无法分配类型“数组”的值
>' 输入 '[MyVC.Objects]'