首页 > 解决方案 > 如何为spark中的每个任务生成数字序列

问题描述

我正在使用下面的代码在火花中映射一些数据。我需要为每个任务生成一个唯一的序列号,同时将其映射到对 rdd。我尝试使用蓄电池。但是我从异常中了解到,在任务内部不可能从累加器中检索值。请帮助我,因为我对火花很陌生,对解决方案一无所知。

Accumulator<Integer> uniqueIdAccumulator = context.getJavaSparkContext().accumulator(0, "uniqueId");
JavaPairRDD<String, String> rdd1 = javaPairRdd.mapToPair(f-> {
    uniqueIdAccumulator.add(1);
    return new Tuple2<String,String>(f._1, this.getMessageString(f._2, null,uniqueIdAccumulator.value()));
});

标签: javaapache-sparkspark-streaming

解决方案


JavaPairRDD rdd1 = javaPairRdd.zipWithIndex().mapToPair(f-> { return new Tuple2(f._1._1,this.getMessageString(f._1._2, null, f._2)); });

这里不需要蓄能器。ZipWithIndex 帮助获得了解决方案。ZipWIthIndex 返回具有现有元组和 Long 索引号的 RDD。我使用索引号来生成唯一的序列号。


推荐阅读