首页 > 解决方案 > Spark:在每个分区中变得不同

问题描述

我想使用 ID 对数据进行分区,并且在每个分区中我想

- 应用一组操作

- 采取不同的

在每个分区内进行区分将避免洗牌。

val rowRDD = sc.textFile("flatten_test_data")
    .filter(_.nonEmpty)
    .map { l =>
        val arr = l.split("\u0001")
        val id = arr(0)
         val value = arr(1)
         (id,value)
    }.partitionBy(new HashPartitioner(4))
    .persist()

现在做一些类似的事情 -

rowRDD.foreachPartition {records => applyOpers(records)}

applyOpers(dataset) 应该做类似的事情 -

dataset.withColumn(udf1).withColumn(udf2).distinct

标签: apache-sparkdataframespark-dataframe

解决方案


forEachPartition在 executor 上执行。因此,您不能在 forEachPartition 中访问 SparkContext/SparkSession。

您可以用作&mapPartitions()的替代品。与&不同,每个 Partition 调用一次,而&则为 RDD 中的每个元素调用一次。主要优点是,我们可以基于每个分区而不是基于每个元素进行初始化。map()foreach()mapPartitions()map()foreach()

我们得到Iterator的参数mapPartition,通过它我们可以遍历分区中的所有元素。

例如:(这个例子是在java中,但这应该给你一个想法。)

JavaRDD<Integer> rdd = sc.parallelize(
      Arrays.asList(1, 2, 3, 4, 5));
    FlatMapFunction<Iterator<Integer>, AvgCount> setup = new FlatMapFunction<Iterator<Integer>, AvgCount>() {
      @Override
      public Iterable<AvgCount> call(Iterator<Integer> input) {
        AvgCount a = new AvgCount(0, 0);
        while (input.hasNext()) {
          a.total_ += input.next();
          a.num_ += 1;
        }
        ArrayList<AvgCount> ret = new ArrayList<AvgCount>();
        ret.add(a);
        return ret;
      }
    };
    Function2<AvgCount, AvgCount, AvgCount> combine = new Function2<AvgCount, AvgCount, AvgCount>() {
        @Override
        public AvgCount call(AvgCount a, AvgCount b) {
        a.total_ += b.total_;
        a.num_ += b.num_;
        return a;
        }
    };

AvgCount result = rdd.mapPartitions(setup).reduce(combine);

推荐阅读