首页 > 解决方案 > Java 流映射中可重用的单实例包装器/对象

问题描述

似乎这个问题应该已经有了答案,但我找不到重复的。

无论如何,我想知道社区对Stream.map这样的用例有何看法?

Wrapper wrapper = new Wrapper();
list.stream()
    .map( s -> {
        wrapper.setSource(s);
        return wrapper;
    } )
    .forEach( w -> processWrapper(w) );
    
public static class Source {
    private final String name;
        
    public Source(String name) {
        this.name = name;
    }
        
    public String getName() {
        return name;
    }
}
    
public static class Wrapper {
    private Source source = null;
        
    public void setSource(Source source) {
        this.source = source;
    }
        
    public String getName() {
        return source.getName();
    }
}

public void processWrapper(Wrapper wrapper) {
}

我不是这种用法的忠实拥护者,map但它在处理大型流时可能有助于提高性能,并避免Wrapper为每个Source.

这肯定有它的局限性,比如对并行流和终端操作几乎没用,比如collect.

更新 - 问题不在于“如何做”,而是“我可以这样做”。例如,我可以有一个仅适用于 Wrapper 的代码,我想在其中调用它,forEach但又想避免为每个Source元素创建它的新实例。

基准测试结果

使用可重复使用的包装器显示大约8倍的改进-

基准 (N) 模式 Cnt 得分误差单位

BenchmarkTest.noReuse 10000000 avgt 5 870.253 ± 122.495 ms/op

BenchmarkTest.withReuse 10000000 avgt 5 113.694 ± 2.528 ms/op

基准代码 -

import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@Fork(value = 2, jvmArgs = {"-Xms2G", "-Xmx2G"})
public class BenchmarkTest {

    @Param({"10000000"})
    private int N;

    private List<Source> data;

    public static void main(String[] args) throws Exception {
        Options opt = new OptionsBuilder()
            .include(BenchmarkTest.class.getSimpleName())
            .forks(1)
            .build();
        new Runner(opt).run();
    }

    @Setup
    public void setup() {
        data = createData();
    }

    @Benchmark
    public void noReuse(Blackhole bh) {
        data.stream()
            .map( s -> new Wrapper1( s.getName() ) )
            .forEach( t -> processTarget(bh, t) );
    }

    @Benchmark
    public void withReuse(Blackhole bh) {
        Wrapper2 wrapper = new Wrapper2();
        data.stream()
            .map( s -> { wrapper.setSource(s); return wrapper; } )
            .forEach( w -> processTarget(bh, w) );
    }
    
    public void processTarget(Blackhole bh, Wrapper t) {
        bh.consume(t);
    }
    
    private List<Source> createData() {
        List<Source> data = new ArrayList<>();
        for (int i = 0; i < N; i++) {
            data.add( new Source("Number : " + i) );
        }
        return data;
    }
    
    public static class Source {
        private final String name;

        public Source(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }
    }

    public interface Wrapper {
        public String getName();
    }
    
    public static class Wrapper1 implements Wrapper {
        private final String name;

        public Wrapper1(String name) {
            this.name = name;
        }

        public String getName() {
            return name;
        }
    }
    
    public static class Wrapper2 implements Wrapper {
        private Source source = null;

        public void setSource(Source source) {
            this.source = source;
        }

        public String getName() {
            return source.getName();
        }
    }
}

完整的基准报告 -

# JMH version: 1.21
# VM version: JDK 1.8.0_191, Java HotSpot(TM) 64-Bit Server VM, 25.191-b12
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_191.jdk/Contents/Home/jre/bin/java
# VM options: -Xms2G -Xmx2G
# Warmup: 5 iterations, 10 s each
# Measurement: 5 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: BenchmarkTest.noReuse
# Parameters: (N = 10000000)

# Run progress: 0.00% complete, ETA 00:03:20
# Fork: 1 of 1
# Warmup Iteration   1: 1083.656 ms/op
# Warmup Iteration   2: 846.485 ms/op
# Warmup Iteration   3: 901.164 ms/op
# Warmup Iteration   4: 849.659 ms/op
# Warmup Iteration   5: 903.805 ms/op
Iteration   1: 847.008 ms/op
Iteration   2: 895.800 ms/op
Iteration   3: 892.642 ms/op
Iteration   4: 825.901 ms/op
Iteration   5: 889.914 ms/op


Result "BenchmartTest.noReuse":
  870.253 ±(99.9%) 122.495 ms/op [Average]
  (min, avg, max) = (825.901, 870.253, 895.800), stdev = 31.812
  CI (99.9%): [747.758, 992.748] (assumes normal distribution)


# JMH version: 1.21
# VM version: JDK 1.8.0_191, Java HotSpot(TM) 64-Bit Server VM, 25.191-b12
# VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_191.jdk/Contents/Home/jre/bin/java
# VM options: -Xms2G -Xmx2G
# Warmup: 5 iterations, 10 s each
# Measurement: 5 iterations, 10 s each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Average time, time/op
# Benchmark: BenchmarkTest.withReuse
# Parameters: (N = 10000000)

# Run progress: 50.00% complete, ETA 00:01:58
# Fork: 1 of 1
# Warmup Iteration   1: 113.780 ms/op
# Warmup Iteration   2: 113.643 ms/op
# Warmup Iteration   3: 114.323 ms/op
# Warmup Iteration   4: 114.258 ms/op
# Warmup Iteration   5: 117.351 ms/op
Iteration   1: 114.526 ms/op
Iteration   2: 113.944 ms/op
Iteration   3: 113.943 ms/op
Iteration   4: 112.930 ms/op
Iteration   5: 113.124 ms/op


Result "BenchmarkTest.withReuse":
  113.694 ±(99.9%) 2.528 ms/op [Average]
  (min, avg, max) = (112.930, 113.694, 114.526), stdev = 0.657
  CI (99.9%): [111.165, 116.222] (assumes normal distribution)


# Run complete. Total time: 00:03:40

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

Benchmark                     (N)  Mode  Cnt    Score     Error  Units
BenchmarkTest.noReuse    10000000  avgt    5  870.253 ± 122.495  ms/op
BenchmarkTest.withReuse  10000000  avgt    5  113.694 ±   2.528  ms/op

标签: javajava-8java-stream

解决方案


您的方法恰好有效,因为流管道仅包含无状态操作。在这样的星座中,顺序流评估一次可以处理一个元素,因此对包装器实例的访问不会重叠,如此处所示。但请注意,这不是保证行为。

它绝对不适用于像sortedand之类的有状态操作distinct。它也不能与归约操作一起使用,因为它们总是必须保存至少两个元素进行处理,包括reduceminmax. 的情况下collect,要看具体情况CollectorforEachOrdered由于需要缓冲,因此不适用于并行流。

请注意,即使您使用TheadLocal创建线程受限的包装器,并行处理也会有问题,因为无法保证在一个工作线程中创建的对象保持在该线程的本地。工作线程可能会在处理另一个不相关的工作负载之前将部分结果移交给另一个线程。

所以这个共享的可变包装器在特定实现的顺序执行中与一组特定的无状态操作一起工作,比如map, filter, forEach, findFirst/Any, 。all/any/noneMatch您没有获得 API 的灵活性,因为您必须限制自己,不能将流传递给期望 a 的任意代码,Stream也不能使用任意Collector实现。您也没有接口的封装,因为您假设特定的实现行为。

换句话说,如果你想使用这样一个可变的包装器,你最好使用一个循环来实现特定的操作。您确实已经有了这种手动实施的缺点,那么为什么不实施它以获得优势。


要考虑的另一个方面是,您从重用这种可变包装器中获得了什么。它仅适用于类似循环的用法,其中临时对象在应用转义分析后可能会被优化掉。在这种情况下,重用对象、延长其生命周期实际上可能会降低性能。

当然,对象缩放不是一种保证行为。可能存在一些场景,例如超过 JVM 内联限制的长流管道,其中对象不会被忽略。但是,临时对象并不一定很昂贵。

这已在此答案中进行了解释。临时对象的分配成本很低。垃圾回收的主要成本是由仍然活着的对象引起的。这些需要遍历,并且在为新分配腾出空间时需要移动它们。临时对象的负面影响是它们可能会缩短垃圾收集轮次之间的时间。但这是分配率和可用分配空间的函数,所以这确实是一个可以通过投入更多RAM来解决的问题。更多的 RAM 意味着 GC 周期之间的时间更长,并且GC 发生时更多的死对象,这使得 GC 的净成本更小。

尽管如此,避免过度分配临时对象是一个有效的问题。IntStream,LongStream和的存在DoubleStream说明了这一点。但这些都是特殊的,因为使用原始类型是使用包装器对象的可行替代方案,而没有重用可变包装器的缺点。它也不同,因为它适用于原始类型和包装类型在语义上等价的问题。相反,您想解决操作需要包装器类型的问题。对于原始流也适用,当您需要解决问题的对象时,没有办法绕过装箱,这将为不同的值创建不同的对象,而不是共享可变对象。

因此,如果您同样有一个问题,即存在语义等效的包装器对象避免替代方案而没有实质性问题,例如仅在可行Comparator.comparingInt的情况下使用而不是Comparator.comparing,您可能仍然更喜欢它。但也只有那时。


简而言之,大多数时候,这种对象重用的节省(如果有的话)并不能证明其缺点是合理的。在特殊情况下,如果它是有益且重要的,您最好使用完全控制的循环或任何其他构造,而不是使用Stream.


推荐阅读