首页 > 解决方案 > AtomicReference#compareAndSet 阻塞了主线程,我应该使用同步吗?

问题描述

我使用 AtomicReference 构建了原子映射、集合和列表类的集合。我遇到了一个问题,即AtomicReference#compareAndSet每秒调用约 20 次时存在严重滞后。(AtomicSet 不会导致任何延迟,但这只是因为如果它已经包含对象,它就不会调用 compareAndSet)

如果我注释掉该方法并简单地添加synchronized到该方法中,滞后就会消失,但这是一个有效的替代品吗?我是否过度设计了一些我不需要的东西?

我尝试了一堆不同的方法布局组合,我只获取当前地图并创建一个新的修改地图,然后重复调用compareAndSet直到它返回 true,仍然滞后。我也试过只打电话AtomicReference#set一次,这也很滞后。

摘要:我的AtomicReference#compareAndSet调用陷入了主线程,我正在寻找我是否完全滥用了 AtomicReference,应该使用synchronized或修复我的代码(如果有的话)。

对于上下文,我在 Minecraft Sponge Mixin 注入中运行以下代码。它所做的只是在定义的方法的底部注入我的 java 代码(它似乎以 Minecraft 滴答速度运行,每秒 20 次):

package net.netcoding.mod.mixins;

import net.minecraft.tileentity.TileEntityHopper;
import net.minecraft.tileentity.TileEntityLockable;
import net.minecraft.util.math.BlockPos;
import net.netcoding.mod.util.Cache;
import org.spongepowered.asm.mixin.Mixin;
import org.spongepowered.asm.mixin.Shadow;
import org.spongepowered.asm.mixin.injection.At;
import org.spongepowered.asm.mixin.injection.Inject;
import org.spongepowered.asm.mixin.injection.callback.CallbackInfo;

@Mixin(TileEntityHopper.class)
public abstract class MixinTileEntityHopper extends TileEntityLockable {

    @Shadow public abstract double getXPos();
    @Shadow public abstract double getZPos();
    @Shadow public abstract double getYPos();

    @Inject(
            method = "update",
            at = @At(
                    value = "TAIL"
            )
    )
    private void mod_update(CallbackInfo ci) {
        BlockPos position = new BlockPos(this.getXPos(), this.getYPos(), this.getZPos());
        Cache.STORED_HOPPERS.put(position, true); // this line calls the AtomicMap#put
    }

}

这是有问题的 AtomicMap 类(请看一下putput2方法):

package net.netcoding.core.api.util.concurrent.atomic;

import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;

public abstract class AtomicMap<K, V, M extends AbstractMap<K, V>> extends AbstractMap<K, V> implements Iterable<Map.Entry<K, V>>, Map<K, V> {

    protected final AtomicReference<M> ref;

    /**
     * Create a new concurrent map.
     */
    protected AtomicMap(M type) {
        this.ref = new AtomicReference<>(type);
    }

    @Override
    public final void clear() {
        this.ref.get().clear();
    }

    @Override
    public final boolean containsKey(Object key) {
        return this.ref.get().containsKey(key);
    }

    @Override
    public final boolean containsValue(Object value) {
        return this.ref.get().containsValue(value);
    }

    @Override
    public final Set<Entry<K, V>> entrySet() {
        return this.ref.get().entrySet();
    }

    @Override
    public final V get(Object key) {
        return this.ref.get().get(key);
    }

    @Override
    public final V getOrDefault(Object key, V defaultValue) {
        M current = this.ref.get();
        return current.getOrDefault(key, defaultValue);
    }

    @Override
    public final boolean isEmpty() {
        return this.ref.get().isEmpty();
    }

    @Override
    public Iterator<Entry<K, V>> iterator() {
        return this.entrySet().iterator();
    }

    @Override
    public final Set<K> keySet() {
        return this.ref.get().keySet();
    }

    @SuppressWarnings("unchecked")
    private M newMap(M current) {
        try {
            Map<K, V> map = current.getClass().newInstance();
            map.putAll(current);
            return (M)map;
        } catch (Exception ex) {
            throw new RuntimeException("Unable to create new map instance of " + current.getClass().getSimpleName() + "!");
        }
    }

    public final Stream<Entry<K, V>> parallelStream() {
        return this.entrySet().parallelStream();
    }

    @Override
    public final V put(K key, V value) {
        while (true) {
            M current = this.ref.get();
            M modified = this.newMap(current);
            V old = modified.put(key, value);

            // Causes severe lag if called ~20 times a second
            if (this.ref.compareAndSet(current, modified))
                return old;
        }
    }

    // Causes no lag, but answers in questions about AtomicReference and synchronized
    // all say synchronized can deadlock a thread, and hang until it's complete
    // which is actually the exact opposite of what I am experiencing
    // and I'm not sure this is even a correct way to use AtomicReference
    public synchronized final V put2(K key, V value) {
        return this.ref.get().put(key, value);
    }

    @Override
    public synchronized final void putAll(Map<? extends K, ? extends V> map) {
        this.ref.get().putAll(map);
        /*while (true) {
            M current = this.ref.get();
            M modified = this.newMap(current);
            modified.putAll(map);

            if (this.ref.compareAndSet(current, modified))
                return;
        }*/
    }

    @Override
    public synchronized final V putIfAbsent(K key, V value) {
        return this.ref.get().putIfAbsent(key, value);
        /*while (true) {
            M current = this.ref.get();
            M modified = this.newMap(current);

            if (!modified.containsKey(key) || modified.get(key) == null) {
                V old = modified.put(key, value);

                if (this.ref.compareAndSet(current, modified))
                    return old;
            } else
                return null;
        }*/
    }

    @Override
    public final V remove(Object key) {
        while (true) {
            M current = this.ref.get();

            if (!current.containsKey(key))
                return null;

            M modified = this.newMap(current);
            V old = modified.remove(key);

            if (this.ref.compareAndSet(current, modified))
                return old;
        }
    }

    @Override
    public final boolean remove(Object key, Object value) {
        while (true) {
            M current = this.ref.get();

            if (!current.containsKey(key))
                return false;

            M modified = this.newMap(current);
            V currentValue = modified.get(key);

            if (Objects.equals(currentValue, value)) {
                modified.remove(key);

                if (this.ref.compareAndSet(current, modified))
                    return true;
            } else
                return false;
        }
    }

    @Override
    public final int size() {
        return this.ref.get().size();
    }

    public final Stream<Entry<K, V>> stream() {
        return this.entrySet().stream();
    }

    @Override
    public final Collection<V> values() {
        return this.ref.get().values();
    }

}

ConcurrentMap(AtomicMap的实现)

package net.netcoding.core.api.util.concurrent;

import net.netcoding.core.api.util.concurrent.atomic.AtomicMap;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

public class ConcurrentMap<K, V> extends AtomicMap<K, V, HashMap<K, V>> {

    /**
     * Create a new concurrent map.
     */
    public ConcurrentMap() {
        super(new HashMap<>());
    }

    /**
     * Create a new concurrent map and fill it with the given map.
     */
    public ConcurrentMap(Map<? extends K, ? extends V> map) {
        super(new HashMap<>(map));
    }

}

标签: javathread-safetysynchronizedatomicreference

解决方案


推荐阅读