首页 > 解决方案 > DStreams:在foreachRDD中创建然后在foreachPartition内部修改的变量在foreachPartition之外重置一次?

问题描述

我在 kafka 中有一堆消息,并使用火花流来处理这些消息。

当我的代码无法插入我的数据库时,我试图捕捉到这些消息,然后将这些消息重新插入 Kafka,以便稍后处理它们。

为了解决这个问题,我在我的 foreachRDD 函数中创建了一个名为“success”的变量。然后,当我尝试更新到数据库时,我返回一个成功插入的布尔值。我在测试期间注意到的是,当我尝试在 foreachPartition 期间插入时,这似乎效果不佳。当我离开 foreachPartition 函数时,成功值似乎被“重置”了。

stream: DStream[String]

stream
  .foreachRDD(rdd => {
    if (!rdd.isEmpty()) {
      var success = true
      rdd.foreachPartition(partitionOfRecords => {
        if (partitionOfRecords.nonEmpty) {
          val listOfRecords = partitionOfRecords.toList
          val successfulInsert: Boolean = insertRecordsToDB(listOfRecords)
          logger.info("Insert was successful: " + successfulInsert)
          if (!successfulInsert) {
            logger.info("logging successful as false. Currently its set to: " + success )
            success = false
            logger.info("logged successful as false. Currently its set to: " + success )

          }
        }
      })

      logger.info("Insert into database successful from all partition: " + success)
      if (!success) {
        // send data to Kafka topic
      }

    }
  })

然后我的日志输出显示了这一点!

2019-06-24 20:26:37 [INFO] 插入成功:false 2019-06-24 20:26:37 [INFO] 记录成功为 false。目前其设置为:true 2019-06-24 20:26:37 [INFO] 成功记录为 false。目前其设置为:false 2019-06-24 20:26:37 [INFO] 从所有分区成功插入数据库:true

即使在第三个日志中它说当前“成功”设置为 false,但是当我离开 foreachPartition 时,我再次记录它并将其设置回 true。

谁能解释为什么?或者提出不同的方法?

标签: apache-sparkspark-streamingspark-streaming-kafka

解决方案


我能够使用蓄电池使其工作。

stream: DStream[String]

val dbInsertACC = sparkSession.sparkContext.longAccumulator("insertSuccess")

stream
  .foreachRDD(rdd => {
    if (!rdd.isEmpty()) {
      //could maybe put accumulator here?
      rdd.foreachPartition(partitionOfRecords => {
        if (partitionOfRecords.nonEmpty) {
          val listOfRecords = partitionOfRecords.toList
          val successfulInsert: Boolean = insertRecordsToDB(listOfRecords)
          logger.info("Insert was successful: " + successfulInsert)
          if (!successfulInsert) dbInsertACC.add(1)
        }
      })

      logger.info("Insert into database successful from all partition: " + success)
      if (!dbInsertACC.isZero) {
        // send data to Kafka topic
      }

    }
  })

推荐阅读