apache-spark - DStreams:在foreachRDD中创建然后在foreachPartition内部修改的变量在foreachPartition之外重置一次?
问题描述
我在 kafka 中有一堆消息,并使用火花流来处理这些消息。
当我的代码无法插入我的数据库时,我试图捕捉到这些消息,然后将这些消息重新插入 Kafka,以便稍后处理它们。
为了解决这个问题,我在我的 foreachRDD 函数中创建了一个名为“success”的变量。然后,当我尝试更新到数据库时,我返回一个成功插入的布尔值。我在测试期间注意到的是,当我尝试在 foreachPartition 期间插入时,这似乎效果不佳。当我离开 foreachPartition 函数时,成功值似乎被“重置”了。
stream: DStream[String]
stream
.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
var success = true
rdd.foreachPartition(partitionOfRecords => {
if (partitionOfRecords.nonEmpty) {
val listOfRecords = partitionOfRecords.toList
val successfulInsert: Boolean = insertRecordsToDB(listOfRecords)
logger.info("Insert was successful: " + successfulInsert)
if (!successfulInsert) {
logger.info("logging successful as false. Currently its set to: " + success )
success = false
logger.info("logged successful as false. Currently its set to: " + success )
}
}
})
logger.info("Insert into database successful from all partition: " + success)
if (!success) {
// send data to Kafka topic
}
}
})
然后我的日志输出显示了这一点!
2019-06-24 20:26:37 [INFO] 插入成功:false 2019-06-24 20:26:37 [INFO] 记录成功为 false。目前其设置为:true 2019-06-24 20:26:37 [INFO] 成功记录为 false。目前其设置为:false 2019-06-24 20:26:37 [INFO] 从所有分区成功插入数据库:true
即使在第三个日志中它说当前“成功”设置为 false,但是当我离开 foreachPartition 时,我再次记录它并将其设置回 true。
谁能解释为什么?或者提出不同的方法?
解决方案
我能够使用蓄电池使其工作。
stream: DStream[String]
val dbInsertACC = sparkSession.sparkContext.longAccumulator("insertSuccess")
stream
.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
//could maybe put accumulator here?
rdd.foreachPartition(partitionOfRecords => {
if (partitionOfRecords.nonEmpty) {
val listOfRecords = partitionOfRecords.toList
val successfulInsert: Boolean = insertRecordsToDB(listOfRecords)
logger.info("Insert was successful: " + successfulInsert)
if (!successfulInsert) dbInsertACC.add(1)
}
})
logger.info("Insert into database successful from all partition: " + success)
if (!dbInsertACC.isZero) {
// send data to Kafka topic
}
}
})
推荐阅读
- python-3.x - 如何优化这个 python 脚本 Pandas 的时间?
- r - Ggplot2:不在饼图标签中显示 0 值
- angular - devextreme angular - 设置 DxDataGrid 自定义 headerFilter 的初始过滤器值
- python - Python变量没有被增加
- reactjs - 反应 onDrop 事件与法语字符冲突
- c# - Ninject 依赖注入 - 两个具体类
- selinux - 获取 avc:AOSP 6.0.1 中的拒绝错误
- angular - 收到 http 应答之前的 Angular 路由
- java - 在 Android Studio 的 ListView 中显示数据
- kubernetes - 无头服务的 GKE 内部入口