首页 > 解决方案 > 无锁并发栈实现

问题描述

我一直在玩弄 Java 中无锁堆栈的简单实现。

编辑:见下面的固定/工作版本


你看到这个实现有什么问题吗?

本地语言中的类似实现似乎受到ABA 问题的影响,但我不确定这是否是一个问题;显然没有直接在 Java 中完成指针处理,并且考虑到我关心的是堆栈的结尾,无论是 pop 还是 push,我看不出堆栈的任何非尾元素中的“丢失”任何更改会如何导致问题.

public class LockFreeStack<T extends LockFreeStack.StackItem<T>>
{
    public abstract static class StackItem<SELF extends StackItem<SELF>>
    {
        volatile SELF next;
        // .. data ..
    }

    final AtomicReference<T> top = new AtomicReference<T>(null);

    public void push(T item)
    {
        T localTop;

        do {
            localTop = top.get();
            item.next = localTop;
        } while(!top.compareAndSet(localTop, item));
    }

    public T pop()
    {
        T localTop;

        do {
            localTop = top.get();
        } while(localTop != null && !top.compareAndSet(localTop, localTop.next));

        return localTop;
    }
}

但是,这就是我不明白的。我编写了一个启动几个线程的简单测试;每个人都从预先存在的 LockFreeStack 中弹出项目,然后(后来,从弹出它的同一线程中)将它们推回。在它弹出之后,我增加一个原子计数器,然后在推回它之前,我减少它。所以我总是希望计数器为 0(在递减之后/在推回堆栈之前)或 1(在弹出和递增之后)。

但是,事实并非如此……

public class QueueTest {
    static class TestStackItem extends LockFreeStack.StackItem<TestStackItem>
    {
        final AtomicInteger usageCount = new AtomicInteger(0);

        public void inc() throws Exception
        {
            int c = usageCount.incrementAndGet();

            if(c != 1)
                throw new Exception(String.format("Usage count is %d; expected %d", c, 1));
        }

        public void dec() throws Exception
        {
            int c = usageCount.decrementAndGet();

            if(c != 0)
                throw new Exception(String.format("Usage count is %d; expected %d", c, 0));
        }
    }

    public final LockFreeStack<TestStackItem> testStack = new LockFreeStack<TestStackItem>();

    public void test()
    {
        final int NUM_THREADS = 4;

        for(int i = 0; i < 10; i++)
        {
            TestStackItem item = new TestStackItem();
            testStack.push(item);
        }

        Thread[] threads = new Thread[NUM_THREADS];
        for(int i = 0; i < NUM_THREADS; i++)
        {
            threads[i] = new Thread(new TestRunner());
            threads[i].setDaemon(true);
            threads[i].setName("Thread"+i);
            threads[i].start();
        }

        while(true)
        {
            Thread.yield();
        }

    }

    class TestRunner implements  Runnable
    {
        @Override
        public void run() {
            try {
                boolean pop = false;
                TestStackItem lastItem = null;
                while (true) {
                    pop = !pop;

                    if (pop) {
                        TestStackItem item = testStack.pop();
                        item.inc();
                        lastItem = item;
                    } else {
                        lastItem.dec();
                        testStack.push(lastItem);
                        lastItem = null;
                    }
                }
            } catch (Exception ex)
            {
                System.out.println("exception: " + ex.toString());
            }
        }
    }
}

抛出不确定的异常,例如

exception: java.lang.Exception: Usage count is 1; expected 0
exception: java.lang.Exception: Usage count is 2; expected 1

或从另一个运行

exception: java.lang.Exception: Usage count is 2; expected 0
exception: java.lang.Exception: Usage count is 3; expected 1
exception: java.lang.Exception: Usage count is 3; expected 1
exception: java.lang.Exception: Usage count is 2; expected 1

所以这里肯定会出现一些类似比赛条件的问题。

这里有什么问题 - 这确实与 ABA 相关(如果是,那么究竟如何?)还是我错过了其他任何东西?

谢谢!


注意:这可行,但似乎不是一个很好的解决方案。它既不是无垃圾的(StampedAtomicReference 在内部创建对象),也不是无锁的好处似乎真的有回报;在我的基准测试中,这在单线程环境中并没有真正更快,并且在同时使用 6 个线程进行测试时,它明显落后于仅在 push/pop 函数周围加锁

根据下面建议的解决方案,这确实是一个 ABA 问题,这个小改动将规避:

public class LockFreeStack<T extends LockFreeStack.StackItem<T>>
{
    public abstract static class StackItem<SELF extends StackItem<SELF>>
    {
        volatile SELF next;
        // .. data ..
    }

    private final AtomicStampedReference<T> top = new AtomicStampedReference<T>(null, 0);

    public void push(T item)
    {
        int[] stampHolder = new int[1];

        T localTop;

        do {
            localTop = top.get(stampHolder);
            item.next = localTop;
        } while(!top.compareAndSet(localTop, item, stampHolder[0], stampHolder[0]+1));
    }

    public T pop()
    {
        T localTop;
        int[] stampHolder = new int[1];

        do {
            localTop = top.get(stampHolder);
        } while(localTop != null && !top.compareAndSet(localTop, localTop.next, stampHolder[0], stampHolder[0]+1));

        return localTop;
    }
}

标签: javaalgorithmconcurrency

解决方案


是的,您的堆栈有 ABA 问题。

  • 线程 Apop执行localTop = top.get()并读取localTop.next

  • 其他线程弹出一堆东西并以不同的顺序放回去,但线程 AlocalTop仍然是最后一个推送的。

  • 线程 A 的 CAS 成功,但它破坏了堆栈,因为它读取的值localTop.next不再准确。

不过,无锁数据结构Java 等垃圾收集语言中实现起来比在其他语言中容易得多。如果 push() 每次都分配一个新的堆栈项,您的 ABA 问题就会消失。然后StackItem.next可以是最终的,整个事情变得更容易推理。


推荐阅读