dataframe - 如何在使用 Spark Java 将 Spark Dataframe 写入 Kafka Producer 时控制记录数
问题描述
我有一个包含两列“keyCol”列和“valCol”列的 spark 数据框。数据框非常庞大,将近 1 亿行。我想以小批量的方式将数据帧写入/生成到 kafka 主题,即每分钟 10000 条记录。此 spark 作业将每天运行一次,从而创建此数据框
如何在下面的代码中实现每分钟 10000 条记录的小批量写入,或者请建议是否有更好/有效的方法来实现这一点。
spark_df.foreachPartition(partitions ->{
Producer<String, String> producer= new KafkaProducer<String, String>(allKafkaParamsMapObj);
while (partitions) {
Row row = partitions.next();
producer.send(new ProducerRecord<String, String>("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//Callback code goes here
}
});
}
return;
});
解决方案
您可以使用grouped(10000)
以下功能并执行睡眠线程一分钟
config.foreachPartition(f => {
f.grouped(10000).foreach( (roqSeq : Seq[Row]) => { // Run 10000 in batch
roqSeq.foreach( row => {
producer.send(new Nothing("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Nothing() {
def onCompletion(recordMetadata: Nothing, e: Exception): Unit = {
//Callback code goes here
}
})
})
Thread.sleep(60000) // Sleep for 1 minute
}
)
})
推荐阅读
- git - 受影响的文件 Jenkins 提供未更改的文件
- r - 尝试安装 ROI.plugin.gurobi 时出现问题
- ios - 如何在我的 Xcode 项目中添加附加功能
- laravel - 如何在父母和孩子上应用 Laravel Eloquent where() 条件(hasMany() 关系)
- cookies - 网站如何获得用户的许可来设置其 cookie?
- node.js - 为什么我不能获取记录在 dynamoosejs 中有一个包含主键值的数组字段?
- javascript - 我们如何在 ASP mvc 中通过 ajax 下载 XML 文件?
- editor - Monaca 编辑器语法高亮是可以的,但是编辑器没有显示任何错误
- c++ - 在 Ubuntu 18.04 的 VIsual Studio 代码中为 C++ 设置 OpenCV
- javascript - 使用 javascript 客户端,如何显示“虚拟文件夹”(= 具有相对路径的 html、img、js 的连贯集合)?