首页 > 解决方案 > 当 CPU 处于高压状态时,Akka.NET 具有持久性丢弃消息?

问题描述

我对我的 PoC 进行了一些性能测试。我看到的是我的演员没有收到发送给他的所有消息,并且性能非常低。我向我的应用程序发送了大约 150k 条消息,这导致我的处理器达到 100% 的利用率峰值。但是当我停止发送请求时,2/3 的消息没有传递给参与者。以下是来自应用洞察的简单指标:

发送的请求数

收到的消息数

为了证明我在 mongo 中持久化的事件数量与我的演员收到的消息几乎相同。

蒙哥

其次,处理消息的性能非常令人失望。我每秒收到大约 300 条消息。 毫秒/秒

我知道默认情况下 Akka.NET 消息传递最多一次,但我没有收到任何错误说该消息被丢弃。

这是代码:集群分片注册:

 services.AddSingleton<ValueCoordinatorProvider>(provider =>
 {
   var shardRegion = ClusterSharding.Get(_actorSystem).Start(
                    typeName: "values-actor",
                    entityProps: _actorSystem.DI().Props<ValueActor>(),
                    settings: ClusterShardingSettings.Create(_actorSystem),
                    messageExtractor: new ValueShardMsgRouter());
                   return () => shardRegion;
 });

控制器:

    [ApiController]
    [Route("api/[controller]")]
    public class ValueController : ControllerBase
    {
        private readonly IActorRef _valueCoordinator;

        public ValueController(ValueCoordinatorProvider valueCoordinatorProvider)
        {
            _valueCoordinator = valuenCoordinatorProvider();
        }

        [HttpPost]
        public Task<IActionResult> PostAsync(Message message)
        {
            _valueCoordinator.Tell(message);
            return Task.FromResult((IActionResult)Ok());
        }
    }

演员:

    public class ValueActor : ReceivePersistentActor
    {
        public override string PersistenceId { get; }
        private decimal _currentValue;

        public ValueActor()
        {
            PersistenceId = Context.Self.Path.Name;
            Command<Message>(Handle);
        }

        private void Handle(Message message)
        {
            Context.IncrementMessagesReceived();
            var accepted = new ValueAccepted(message.ValueId, message.Value);
            Persist(accepted, valueAccepted =>
            {
                _currentValue = valueAccepted.BidValue;
            });
        }

    }

消息路由器。

    public sealed class ValueShardMsgRouter : HashCodeMessageExtractor
    {
        public const int DefaultShardCount = 1_000_000_000;

        public ValueShardMsgRouter() : this(DefaultShardCount)
        {
        }

        public ValueShardMsgRouter(int maxNumberOfShards) : base(maxNumberOfShards)
        {
        }

        public override string EntityId(object message)
        {
            return message switch
            {
                IWithValueId valueMsg => valueMsg.ValueId,
                _ => null
            };
        }
    }

akka.conf

akka {  
     stdout-loglevel = ERROR
     loglevel = ERROR
     actor {
       debug {  
              unhandled = on
        }
        provider = cluster
         serializers {
              hyperion = "Akka.Serialization.HyperionSerializer, Akka.Serialization.Hyperion"
         }
        serialization-bindings {
          "System.Object" = hyperion
         }
        deployment {
            /valuesRouter {
                router = consistent-hashing-group
                routees.paths = ["/values"]
                cluster {
                    enabled = on
                }
            }        
        }
     }
                        
     remote {
        dot-netty.tcp {
            hostname = "desktop-j45ou76"
            port = 5054
        }
     }          

     cluster {
        seed-nodes = ["akka.tcp://valuessystem@desktop-j45ou76:5054"] 
     }
persistence {
    journal {
        plugin = "akka.persistence.journal.mongodb"
        mongodb {
            class = "Akka.Persistence.MongoDb.Journal.MongoDbJournal, Akka.Persistence.MongoDb"

            connection-string = "mongodb://localhost:27017/akkanet"

            auto-initialize = off
            plugin-dispatcher = "akka.actor.default-dispatcher"

            collection = "EventJournal"
            metadata-collection = "Metadata"
            legacy-serialization = off
        }
    }

    snapshot-store {
        plugin = "akka.persistence.snapshot-store.mongodb"
        mongodb {
            class = "Akka.Persistence.MongoDb.Snapshot.MongoDbSnapshotStore, Akka.Persistence.MongoDb"
            connection-string = "mongodb://localhost:27017/akkanet"
            auto-initialize = off
            plugin-dispatcher = "akka.actor.default-dispatcher"
            collection = "SnapshotStore"
            legacy-serialization = off
        }
    }
}     
}



标签: akka.netakka.net-clusterakka.net-persistence

解决方案


所以这里有两个问题:演员的表现和丢失的消息。

从你的文章中并不清楚,但我会做一个假设:这些消息中的 100% 都发送给单个参与者。

演员表演

单个actor的端到端吞吐量取决于:

  1. 将消息路由到参与者所需的工作量(即通过分片系统、层次结构、网络等)
  2. 参与者处理单个消息所花费的时间,因为这决定了邮箱可以被清空的速率;和
  3. 任何影响可以在何时处理哪些消息的流控制——即,如果参与者使用存储和行为切换,参与者在等待其状态更改时存储消息所花费的时间将对端到端产生累积影响所有隐藏消息的处理时间。

由于此列表中的第 3 项,您的表现会很差。您正在实现的设计调用Persist阻止参与者执行任何其他处理,直到消息成功持久化。发送给actor的所有其他消息都在内部隐藏,直到前一条消息成功持久化。

从单个参与者的角度来看,Akka.Persistence 提供了四种持久化消息的选项:

  • Persist- 最高一致性(在确认持久性之前不能处理其他消息),最低性能;
  • PersistAsync- 更低的一致性,更高的性能。在处理邮箱中的下一条消息之前,不等待消息被持久化。允许同时处理来自单个持久性参与者的多条消息 - 这些事件的持久化顺序将被保留(因为它们按该顺序发送到内部 Akka.Persistence 日志IActorRef),但参与者将继续在确认持久消息之前处理其他消息。这意味着您可能必须在调用之前PersistAsync而不是事后修改演员的内存状态。
  • PersistAll- 高一致性,但一次批处理多个持久事件。与相同的排序和控制流语义Persist- 但您只是将一组消息保存在一起。
  • PersistAllAsync-最高性能。语义相同,PersistAsync但它是数组中的原子批消息,被持久化在一起。

要了解 Akka.Persistence 的性能特征如何随这些方法发生变化,请查看 Akka.NET 组织围绕 Akka.Persistence.Linq2Db(新的高性能 RDBMS Akka)收集的详细基准数据.Persistence 库:https ://github.com/akkadotnet/Akka.Persistence.Linq2Db#performance - SQL 上每秒 15,000 和每秒 250 之间的差异;在像 MongoDB 这样的系统中,写入性能可能会更高。

Akka.Persistence 的一个关键属性是,它有意通过集群中每个节点上的一组集中式“日志”和“快照”actor 路由所有持久性命令 - 因此来自多个持久性actor的消息可以一起批处理少量并发数据库连接。有许多用户同时运行数十万个持久性参与者——如果每个参与者都有自己独特的数据库连接,即使是地球上最强大的垂直扩展数据库实例也会融化。这种连接池/共享是单个持久参与者依赖流量控制的原因。

使用任何持久性 Actor 框架(即 Orleans、Service Fabric),您都会看到类似的性能,因为它们都采用类似的设计,原因与 Akka.NET 相同。

为了提高性能,您需要将接收到的消息一起批处理并将它们保存在一个组中PersistAll(将其视为de-bounceing)或使用PersistAsync.

如果您将工作负载分散到具有不同实体 id 的许多并发参与者上,您还将看到更好的聚合性能 - 这样您就可以从参与者并发和并行性中受益。

缺少消息

发生这种情况的原因可能有很多——最常见的原因是:

  1. Actor 被终止(与重新启动不同)并将其所有消息转储到DeadLetter集合中;
  2. 网络中断导致连接断开 - 当节点处于 100% CPU 时可能会发生这种情况 - 当时排队等待传递的消息可能会被丢弃;和
  3. Akka.Persistence 日志从数据库接收超时将导致持久性参与者由于失去一致性而终止自己。

您应该在日志中查找以下内容:

  • DeadLetter警告/计数
  • OpenCircuitBreakerException来自 Akka.Persistence

您通常会看到这两者一起出现 - 我怀疑这就是您的系统正在发生的事情。另一种可能性可能是 Akka.Remote throwing DisassociationExceptions,我也会寻找它。

failure-detector您可以通过在配置https://getakka.net/articles/configuration/akka.cluster.html中更改 Akka.Cluster 的心跳值来修复 Akka.Remote 问题:

akka.cluster.failure-detector {

      # FQCN of the failure detector implementation.
      # It must implement akka.remote.FailureDetector and have
      # a public constructor with a com.typesafe.config.Config and
      # akka.actor.EventStream parameter.
      implementation-class = "Akka.Remote.PhiAccrualFailureDetector, Akka.Remote"

      # How often keep-alive heartbeat messages should be sent to each connection.
      heartbeat-interval = 1 s

      # Defines the failure detector threshold.
      # A low threshold is prone to generate many wrong suspicions but ensures
      # a quick detection in the event of a real crash. Conversely, a high
      # threshold generates fewer mistakes but needs more time to detect
      # actual crashes.
      threshold = 8.0

      # Number of the samples of inter-heartbeat arrival times to adaptively
      # calculate the failure timeout for connections.
      max-sample-size = 1000

      # Minimum standard deviation to use for the normal distribution in
      # AccrualFailureDetector. Too low standard deviation might result in
      # too much sensitivity for sudden, but normal, deviations in heartbeat
      # inter arrival times.
      min-std-deviation = 100 ms

      # Number of potentially lost/delayed heartbeats that will be
      # accepted before considering it to be an anomaly.
      # This margin is important to be able to survive sudden, occasional,
      # pauses in heartbeat arrivals, due to for example garbage collect or
      # network drop.
      acceptable-heartbeat-pause = 3 s

      # Number of member nodes that each member will send heartbeat messages to,
      # i.e. each node will be monitored by this number of other nodes.
      monitored-by-nr-of-members = 9

      # After the heartbeat request has been sent the first failure detection
      # will start after this period, even though no heartbeat mesage has
      # been received.
      expected-response-after = 1 s

    }

acceptable-heartbeat-pause = 3 s如果需要,将值增加到更大的值,例如 10、20、30。

分片配置

我想用您的代码指出的最后一件事 - 分片数太高了。每个节点应该有大约 10 个分片。将其减少到合理的程度。


推荐阅读