apache-spark - Spark mapPartitions 问题
问题描述
我在我的 DF 上使用 spark mapPartition,用例我应该为每个分区提交一个作业(调用 lambda 或发送 SQS 消息)。
我正在对自定义格式的日期列进行分区,并在之前和之后记录分区数,它按预期工作。
但是,当我看到作业总数时,它比分区数还多。对于某些分区,有两个或三个工作!
这是我正在使用的代码
val yearMonthQueryRDD = yearMonthQueryDF.rdd.mapPartitions(
partition => {
val partitionObjectList = new java.util.ArrayList[String]()
logger.info("partitionIndex = {}",TaskContext.getPartitionId());
val partitionCounter:AtomicLong = new AtomicLong(0)
val partitionSize:AtomicLong = new AtomicLong(0)
val paritionColumnName:AtomicReference[String] = new AtomicReference[String]();
// Iterate the Objects in a given parittion
val updatedPartition = partition.map( record => {
import yearMonthQueryDF.sparkSession.implicits._
partitionCounter.set(partitionCounter.get()+1)
val recordSizeInt = Integer.parseInt(record.getAs("object_size"))
val recordSize:Long = recordSizeInt.toLong
partitionObjectList.add(record.getAs("object_key"))
paritionColumnName.set(record.getAs("partition_column_name"))
record
}
).toList
logger_ref.info("No.of Elements in Partition ["+paritionColumnName.get()+"] are =["+partitionCounter.get()+"] Total Size=["+partitionSize.get()+"]")
// Submit a Job for the parition
// jobUtil.submitJob(paritionColumnName.get(),partitionObjectList,partitionSize.get())
updatedPartition.toIterator
}
)
使调试更加困难的另一件事是在容器错误日志中找不到 mapPartitions() 方法中的日志记录语句(因为它们是在每个工作节点上而不是在主节点上执行的,所以我希望它们在容器日志中找到它们而不是在主节点日志中。需要弄清楚为什么我只看到标准错误日志而不是容器上的标准输出日志)。
谢谢萨蒂什
解决方案
推荐阅读
- java - 是否可以禁用 Natable 的虚拟表行为
- mysql - SQLSTATE[42S01]:基表或视图已存在:1050 表 'ftp_exchange' 已存在。无法更新我的存储库
- html - 我尝试了很多次,两步登录表单都不起作用
- visual-studio-code - VSCode 调试无法显示参考值
- jekyll - 过滤分页器.posts
- r - R Shiny,如何让观察者先执行?
- visual-studio - 如何显示右侧布局/边距?
- java - 如何在java中提取以特定字符串开头的部分字符串
- python - 按一列对数据框进行分组并基于该列添加信息
- javascript - isCancel 在 axios 中始终为 false