java - 如何在 AKKA Actor 中实现线程安全?
问题描述
我的项目需要大量的异步编程,所以我选择了 AKKA 平台,因为 Actor 模型可以像编写同步代码一样实现异步系统,而无需担心线程问题。一切正常,直到我遇到以下问题(演示代码):
import akka.actor.AbstractActor;
import akka.japi.pf.ReceiveBuilder;
import java.util.concurrent.locks.ReentrantLock;
public class TestActor extends AbstractActor {
private final ReentrantLock lock = new ReentrantLock();
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.matchEquals("lock", s -> lock.lock())
.matchEquals("unlock", s -> lock.unlock())
.build();
}
}
首先发送一个“锁定”消息,然后发送一个“解锁”消息,在收到发送消息后尝试解锁时,IllegalMonitorStateException
抛出一个,我发现这是由于不同的消息实际上是由不同的线程处理的,s -> lock.lock()
并且s -> lock.unlock()
是在不同的线程中执行所以IllegalMonitorStateException
被抛出。
我之前的假设是演员的所有动作都在一个线程中执行,因此它是完全线程安全的,不必担心线程问题。当我在我的项目中广泛使用 AKKA 时,现在我非常担心并且不清楚何时需要在使用 AKKA 时考虑线程问题。例如,在以下演示代码中:
public class TestActor1 extends AbstractActor {
private int count = 0;
private Map<Integer, Integer> map = new HashMap<>();
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.matchEquals("action1", s -> count++)
.matchEquals("action2", s -> getSender().tell(count, getSelf()))
.matchEquals("action3", s -> map.put(3, 2))
.matchEquals("action4", s -> getSender().tell(map.get(3), getSelf()))
.build();
}
}
是使用线程安全count
的方式吗?map
我需要使用volatile
forcount
和使用ConcurrentHashMap
formap
吗?
ps =======================
下面的演示代码演示了为什么我需要锁定actor,基本上我正在实现一个带有背压控制的管道,一旦一个actor从上游actor接收到过多的任务,它就会向上游actor发送backPressureHi
消息以停止上游actor执行循环直到背压恢复正常并发送一个backPressureNormal
恢复:
public class PipeLineActor extends AbstractActor {
private final ReentrantLock stallLock = new ReentrantLock();
private Thread executionLoop = new Thread(() -> {
while (true){
stallLock.lock();
stallLock.unlock();
// issue tasks to down stream actors
}
});
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
// down stream actor send "backPressureHi" when back pressure is high to stall the executionLoop
.matchEquals("backPressureHi", s -> stallLock.lock())
// down stream actor send "backPressureNormal" when back pressure resumed normal to resume the executionLoop
.matchEquals("backPressureNormal", s -> stallLock.unlock())
.build();
}
}
解决方案
Akka 被设计为线程安全的。并且从不需要在参与者内进行锁定或同步。不应该这样做。
Akka 通过一次处理一条消息来实现线程安全。一个actor不能同时处理多个消息。但是消息可能并且将在不同的线程中处理。(这是默认行为,但可以使用 pin 调度程序进行更改)。
从文档
由于参与者实例一次处理一条消息,因此不需要同步或 AtomicInteger 等并发保护。
对于你最后的问题,
count 和 map 的使用方式是线程安全的吗?
是的,它是线程安全的。
我是否需要使用 volatile 进行计数并使用 ConcurrentHashMap 进行映射?
不,没有必要这样做。请参阅Akka 和 Java 内存模型
通俗地说,这意味着当该参与者处理下一条消息时,该参与者的内部字段的更改是可见的。所以你的actor中的字段不需要是可变的或等效的。
推荐阅读
- python - 设置 python 时无法使用特定的 gcc 版本
- c# - 将 NULL 值从数据库映射到对象的属性
- reactjs - 下一个 js 获取在 componentdidmount 上失败
- elasticsearch - 在弹性搜索过滤器中实现 Array.Except(Array2) > 0 查询?
- python - numpy 中的条件。如何使用 pandas 或 numpy 将 3 个或更多放入我的数据框中?
- xamarin - 如何更改 Xamarin Forms 中导航栏标题区域的字体大小?
- json - 仅在 JSON 之后加载 tableView
- uml - UML 状态图中是否允许没有直接转换的状态?
- android - 未找到 Azure Jenkins Android SDK
- ios - 是否可以为 UIContextualAction 对象的图像着色?