首页 > 解决方案 > 如何从 Flux 中选择每个组的最大项目

问题描述

鉴于以下情况MyObject,从该助焊剂Flux<MyObject>中去除MyObjects具有相同属性的最佳方法是什么?

    import lombok.Data;
    import reactor.core.publisher.Flux;
    
    public class Example {
    
        @Data
        public class MyObject {
            final String name;
            final int priority;
        }
    
        public Example() {
            Flux<MyObject> myFlux = Flux.just(
                    new MyObject("abc", 2),
                    new MyObject("abc", 4),
                    new MyObject("cde", 1));
        }
    }

例如,我想删除相同的对象,name同时选择更高的对象priority
输出:[Example.MyObject(name=abc, priority=4), Example.MyObject(name=cde, priority=1)]

如果我使用myFlux.distinct(MyObject::getName),我将无法选择保留哪一个。

标签: javareactive-programmingspring-webfluxproject-reactor

解决方案


要解决这个问题,您首先需要将其转换Flux<MyObject>为 a Mono<List<MyObject>>,因为您需要知道所有对象及其优先级才能对它们进行排序。

一旦你有了 的所有实例的列表MyObject,你就可以使用 Java 8 Stream api 来解决这个问题:

@Slf4j
public class Example {

    public static void main(String[] args) {
        Flux<MyObject> myFlux = Flux.just(
                        new MyObject("abc", 2),
                        new MyObject("abc", 4),
                        new MyObject("cde", 1))
                .collectList()
                .map(myObjectsList -> myObjectsList.stream()
                        .collect(Collectors
                                .groupingBy(MyObject::getName)))
                // now we have a Map<String, List<MyObject>>
                .map(Map::entrySet)
                // now we have a Set<Entry<String, List<MyObject>>>
                .flatMapIterable(entrySet -> entrySet)
                .map(Map.Entry::getValue)
                // now we have a Flux<List<MyObject>>
                // and all MyObject in that list have 
                // the same name
                .filter(allObjectsWithSameName -> !allObjectsWithSameName.isEmpty())
                // now we sort all the lists in descending order
                // and return the first element
                // which is the one with the highest prio
                .map(allObjectsWithSameName -> {
                            allObjectsWithSameName.sort(new Comparator<MyObject>() {
                                @Override
                                public int compare(MyObject o1, MyObject o2) {
                                    return Integer.compare(o2.priority, o1.priority);
                                }
                            });
                            return allObjectsWithSameName.get(0);
                        }
                );

        myFlux.subscribe(result -> System.out.println("MyObject: " + result.toString()));
    }

    @Data
    @RequiredArgsConstructor
    public static class MyObject {
        final String name;
        final int priority;
    }
}

输出:

MyObject: Example.MyObject(name=abc, priority=4)
MyObject: Example.MyObject(name=cde, priority=1)

推荐阅读