首页 > 解决方案 > Spark JAVA程序

问题描述

我正在尝试使用 Spark Java 解决这个问题:

编写一些返回每个 customerId 的旅行次数的 spark 代码。旅行是一系列旅行交易,每次交易之间的时间不超过 7 天。

输入:(customerid,fname,lname,gender,date)
输出:(customerid, numTrips)

我试过的代码:

SimpleDateFormat sdf = new SimpleDateFormat("MM/dd/yyyy");
TripData td=new TripData();
JavaRDD<String> getData= jsc.read().textFile("E:/PROJECTS/SPARK/RESOURCES/TRIPS_MOCK_DATA.txt").javaRDD();
JavaPairRDD<Integer,Iterable<Long>> tripByKey=  getData.mapToPair(line->
    {
        String[] parts=SPACES.split(line);
        return new Tuple2<>(Integer.parseInt(parts[0]),sdf.parse(parts[4]).getTime());
    }).groupByKey().sortByKey();

解决方案:我创建了一个带有 CustomerId 和日期列表的 Map 对,接下来我想对日期进行排序并发现日期差异小于 7 天。

我是 Spark 的新手,没有任何提示可以继续进行,有人可以帮我吗?

样本输入和输出:

样本

标签: javaapache-sparkjava-8

解决方案


JavaPairRDD<Integer,Iterable<Long>> tripByKey=  getData.mapToPair(line->
    {
        String[] parts=SPACES.split(line);
        return new Tuple2<>(Integer.parseInt(parts[0]),sdf.parse(parts[4]).getTime());
    }).groupByKey();

在 groupByKey 转换之后,我们将拥有用户(键),该用户的所有日期(作为可迭代的值)。现在要获取每个用户的旅行次数,我们必须对这些日期进行排序,并使用我们的逻辑来获取每个用户的旅行。

JavaPairRDD<Integer, Integer> tripsCountPerUser = tripByKey.mapValues(func);


 Function<Iterable<Long>, Integer> fun = (Iterable<Long> itr ) -> {
        List<Long> dates = new ArrayList<>();
        for (Long i:itr) {
            dates.add(i);
        }
        Collections.sort(dates);
        long day = 86400000l ;
        long days7 = day * 7;

        int count = 0;
        Long firstDay = null;
        for (Long dt : dates) {
            if(firstDay == null)
            {
                firstDay = dt;
                count = 1;
            }
            else {
                Long diffMs = dt - firstDay ;

                if(diffMs > days7 ) {
                    firstDay = dt;
                    count ++;
                }
            }
        }
        return count;
    };

上面的代码不是一个优化的代码,可以通过几种不同的方式来完成。上面的代码就是给出解决问题的方法。希望能帮助到你。

假设:-

2018-01-01, 2018-01-08, 2018-01-09, 2018-01-11 考虑了两次旅行,它们是,

  1. 2018-01-01、2018-01-08(7天内)
  2. 2018-01-09、2018-01-11(7天)

推荐阅读