首页 > 解决方案 > 如何将行写入Kafka?

问题描述

我有一个有 4 列的数据框,我想将每一行作为一个事件推送到 Kafka。

  1. 将事件推送到 Kafka 的一种方法
df = df.selectExpr("CAST(id AS STRING) as key", "to_json(struct(*)) AS value");

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
        .write()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("topic", "test")
        .save();

关注

  1. 将事件推送到 Kafka 的另一种方式

对于数据帧的每一行调用一个 UDF/方法,它使用这 4 列创建一个类对象并序列化,假设serializedClassObject = mapper.writeValueAsString(classObj)并调用 Kafka 生产者的producer.send(serializedClassObject,new Callback() { <callback implementation> });

关注点

我想知道我应该采用哪种实现方式,由于可共享的事件模型和回调,我有点倾向于第二种方式,但我不确定批量推送到 Kafka。

标签: apache-sparkapache-kafka

解决方案


I think this doesn't provide a callback(that event is being pushed or not)

It does and when it fails it gives you an exception (to be caught by a Spark application itself).

Another way to push event to Kafka

Use Dataset.foreach or Dataset.foreachPartition.

foreach(f: (T) ⇒ Unit): Unit

foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit

推荐阅读