首页 > 解决方案 > Spark mapPartitions 问题

问题描述

我在我的 DF 上使用 spark mapPartition,用例我应该为每个分区提交一个作业(调用 lambda 或发送 SQS 消息)。

我正在对自定义格式的日期列进行分区,并在之前和之后记录分区数,它按预期工作。

但是,当我看到作业总数时,它比分区数还多。对于某些分区,有两个或三个工作!

这是我正在使用的代码

val yearMonthQueryRDD = yearMonthQueryDF.rdd.mapPartitions(
         partition => {
          val partitionObjectList = new java.util.ArrayList[String]()
          logger.info("partitionIndex = {}",TaskContext.getPartitionId());
          val partitionCounter:AtomicLong = new AtomicLong(0)
          val partitionSize:AtomicLong = new AtomicLong(0)
          val paritionColumnName:AtomicReference[String] = new AtomicReference[String]();

      // Iterate the Objects in a given parittion 
      val updatedPartition = partition.map( record => {
            import yearMonthQueryDF.sparkSession.implicits._
            partitionCounter.set(partitionCounter.get()+1)
            val recordSizeInt = Integer.parseInt(record.getAs("object_size"))
            val recordSize:Long = recordSizeInt.toLong
            partitionObjectList.add(record.getAs("object_key"))
            paritionColumnName.set(record.getAs("partition_column_name"))
            record
        }
      ).toList
      
       logger_ref.info("No.of Elements in Partition ["+paritionColumnName.get()+"] are =["+partitionCounter.get()+"] Total Size=["+partitionSize.get()+"]")
       // Submit a Job for the parition
       // jobUtil.submitJob(paritionColumnName.get(),partitionObjectList,partitionSize.get())
       updatedPartition.toIterator
    }
  )

使调试更加困难的另一件事是在容器错误日志中找不到 mapPartitions() 方法中的日志记录语句(因为它们是在每个工作节点上而不是在主节点上执行的,所以我希望它们在容器日志中找到它们而不是在主节点日志中。需要弄清楚为什么我只看到标准错误日志而不是容器上的标准输出日志)。

谢谢萨蒂什

标签: apache-spark

解决方案


推荐阅读