首页 > 解决方案 > 如何在使用 Spark Java 将 Spark Dataframe 写入 Kafka Producer 时控制记录数

问题描述

我有一个包含两列“keyCol”列和“valCol”列的 spark 数据框。数据框非常庞大,将近 1 亿行。我想以小批量的方式将数据帧写入/生成到 kafka 主题,即每分钟 10000 条记录。此 spark 作业将每天运行一次,从而创建此数据框

如何在下面的代码中实现每分钟 10000 条记录的小批量写入,或者请建议是否有更好/有效的方法来实现这一点。

spark_df.foreachPartition(partitions ->{
            Producer<String, String> producer= new KafkaProducer<String, String>(allKafkaParamsMapObj);
            while (partitions) {
                Row row =  partitions.next();
                producer.send(new ProducerRecord<String, String>("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                       //Callback code goes here
                    }
                });
            }
            return;
        });

标签: dataframeapache-sparkapache-kafkaspark-streamingkafka-producer-api

解决方案


您可以使用grouped(10000)以下功能并执行睡眠线程一分钟

config.foreachPartition(f => {
      f.grouped(10000).foreach( (roqSeq : Seq[Row]) => { // Run 10000 in batch

        roqSeq.foreach( row => {
          producer.send(new Nothing("topicName", row.getAs("keyCol"), row.getAs("valCol")), new Nothing() {
            def onCompletion(recordMetadata: Nothing, e: Exception): Unit = {
              //Callback code goes here
            }
          })
        })
          Thread.sleep(60000) // Sleep for 1 minute
        }
      )
    })

推荐阅读