apache-spark - 如何将行写入Kafka?
问题描述
我有一个有 4 列的数据框,我想将每一行作为一个事件推送到 Kafka。
- 将事件推送到 Kafka 的一种方法
df = df.selectExpr("CAST(id AS STRING) as key", "to_json(struct(*)) AS value");
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test")
.save();
关注:
- 我认为这不提供回调(该事件是否被推送),另一件事是我们没有与消费者共享的 dto/models 使用它们可以反序列化类对象。
- 将事件推送到 Kafka 的另一种方式
对于数据帧的每一行调用一个 UDF/方法,它使用这 4 列创建一个类对象并序列化,假设serializedClassObject = mapper.writeValueAsString(classObj)
并调用 Kafka 生产者的producer.send(serializedClassObject,new Callback() { <callback implementation> });
关注点:
- 与第一种方式相比,第二种方式是否更慢,因为我们可能会逐个推送事件而不是批量推送。
我想知道我应该采用哪种实现方式,由于可共享的事件模型和回调,我有点倾向于第二种方式,但我不确定批量推送到 Kafka。
解决方案
I think this doesn't provide a callback(that event is being pushed or not)
It does and when it fails it gives you an exception (to be caught by a Spark application itself).
Another way to push event to Kafka
Use Dataset.foreach or Dataset.foreachPartition.
foreach(f: (T) ⇒ Unit): Unit
foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit
推荐阅读
- flutter - 使用 Flutter Form 进行异步验证
- c# - 如何使用参数从另一个任务创建自定义用户控件
- typescript - 我正在尝试让我的 js 库支持 ts
- jquery - 抓取 iframe 的内容并将其放在 div 类中
- python - 为什么我的 Python 脚本不与 PHP(shell_exec) 通信?
- javascript - 如何在反应三个js中过滤点击?
- flutter - Flutter 总是重启而不是热重载
- flutter-web - 将移动应用程序转换为 Web 应用程序时出错
- groovy - 如何在 Groovy 中附加列表列表
- sql - 在数组中编写 PLSQL 选择