首页 > 解决方案 > 高吞吐量写入 Java 8 并发中的变量?

问题描述

如果我在 Java 8 程序中有一个简单的整数,它可以被多个线程读写。

如果我被告知应用程序需要支持高吞吐量读取和很少的写入 - 答案很简单,我只使用读写锁。然后多个线程可以在没有阻塞的情况下同时执行读取 - 并且阻塞仅在不频繁的写入完成时发生。

但是如果我被告知应用程序需要支持高吞吐量写入(即共享变量被不同的线程频繁更新)。无论我在这里使用哪种锁,据我所知,它总是会导致线程阻塞——当一个线程获得变量的锁并对其进行更新时,其余线程也在尝试更新变量只需要等到他们获得锁 - 这是正确的还是我在 Java 8 中遗漏了什么?

我可以开始在共享变量上编写某种异步更新方法,其中线程调用它立即返回的更新方法,并且我在幕后使用某种数据结构来对共享变量的写入进行排队。至少这样我会在尝试更新共享变量时防止线程阻塞。当然,这种方法会引发其他问题,例如线程是否应该保证写入定义。成功还是应该提供回调以通知更新成功等。除了这样的事情之外,在使用 Java 8 中的任何 Lock 进行高吞吐量写入时,我认为没有办法绕过阻塞?(或者我是否应该接受阻塞并且即使在高吞吐量写入的情况下也应该使用 Lock)。谢谢

标签: concurrencyjava-8lockingjava.util.concurrent

解决方案


严格来说Integer-您可以使用LongAdder,它的实现似乎完全适合您的情况。如果你关心这里有一些额外的细节。

CAS在底层使用(比较和交换),很像AtomicLong,但有一些区别。首先long value,它所拥有的实际内容被包装在一个所谓的Cell- 基本上是一个允许cas(比较和交换)value新值的类,如果你愿意,就像一个 setter 一样。这Cell也被注释@sun.misc.Contended以防止错误共享;这是对它的解释(来自代码注释):

但是驻留在数组中的原子对象往往会彼此相邻放置,因此通常会在没有这种预防措施的情况下共享缓存行(具有巨大的负面性能影响)。

从这里开始实现非常有趣。让我们看看调用add(long x)方法时会发生什么:

 public void add(long x) {
    Cell[] cs; long b, v; int m; Cell c;
    if ((cs = cells) != null || !casBase(b = base, b + x)) {
        boolean uncontended = true;
        if (cs == null || (m = cs.length - 1) < 0 ||
            (c = cs[getProbe() & m]) == null ||
            !(uncontended = c.cas(v = c.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

这个想法是,如果Cell [] cs 是 null,之前没有争用,意思long value要么没有初始化,要么所有先前CAS的操作都被所有线程成功。在这种情况下,尝试CAS使用新值long value- 如果有效,我们就完成了。如果失败了,则会Cell []创建一个数组,以便每个单独的线程尝试在自己的空间中工作,从而最大限度地减少争用。

如果我正确理解了您的问题,下一句是您真正关心的(这是我的,根本不是来自代码注释):

用更简单的话来说:如果线程之间没有争用,则工作就像AtomicLong使用(某种)一样完成,否则尝试为每个线程创建一个单独的空间来工作。

如果你关心一些我觉得有趣的额外细节:

TheCell[]始终是 2 的幂(很像HashMap内部数组);然后每个线程使用ThreadLocalRandom创建一些 hashCode 来尝试在数组中找到Cell [] cs要写入的条目,甚至再次重新哈希Marsaglia XorShif以尝试在该数组中找到空闲槽;阵列的大小限制为您拥有的核心数量(实际上是最接近 2 的幂),该阵列可以调整大小,因此它可以增长并且所有这些操作都使用volatile int cellsBusy自旋锁完成。这段代码很棒,但如前所述,我没有全部了解。


推荐阅读