首页 > 解决方案 > 将 DataFrame 发布到 Kafka

问题描述

我遇到了一个非常琐碎的问题,但目前我找不到解决方案。

假设我有一个 spark DataFrame,它可能是无类型的或强类型的,这并不重要。

现在我想将它发布到 Kafka,下面的代码效果很好:

df2.selectExpr("CAST(key AS STRING) as key", "CAST(value AS STRING) as value")
      .write.format("kafka")
      .option("kafka.bootstrap.servers", "host:9092")
      .option("topic", "test").save()

但是,我希望使用更复杂的消毒器发布——在我的例子中是定制的。

我该怎么做?换句话说,我希望发布一个对象,而不是发布字符串。

我的数据源是 Vertica,我正在使用Vertica 连接器来使用事件。

标签: scalaapache-sparkapache-kafka

解决方案


您可以使用foreachPartition以自定义方式将数据发布到外部源。这样,如果您使用 foreach,您将只为每个分区创建一个连接,而不是记录。


推荐阅读