首页 > 解决方案 > 限制时间间隔内发送的消息数

问题描述

使用下面的代码,我试图限制在指定时间范围内发送给参与者的消息数量。但是消息没有受到限制,并且正在尽快发送。下游参与者只是向 Google 主页发出 http 请求。

我尝试限制在 3 秒内发送 3 条消息的节流器代码:

  val throttler: ActorRef =
    Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
      .throttle(3, 1.second)
      .to(Sink.actorRef(printer, NotUsed))
      .run()

如何限制循环内发送的消息数量:

  for( a <- 1 to 10000){

    // Create the 'greeter' actors
    val howdyGreeter: ActorRef =
      system.actorOf(Greeter.props(String.valueOf(a), printer), String.valueOf(a))

    howdyGreeter ! RequestActor("RequestActor")
    howdyGreeter ! Greet
  }

每秒 3 次?

整个代码:

//https://developer.lightbend.com/guides/akka-quickstart-scala/full-example.html

import akka.NotUsed
import akka.stream.{OverflowStrategy, ThrottleMode}
import akka.stream.scaladsl.{Sink, Source}
import org.apache.http.client.methods.HttpGet
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.DefaultHttpClient
import net.liftweb.json._
import net.liftweb.json.Serialization.write
import org.apache.http.util.EntityUtils
//import akka.contrib.throttle.TimerBasedThrottler
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import scala.concurrent.duration._
import akka.NotUsed
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.OverflowStrategy
import akka.stream.ThrottleMode
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source

object Greeter {
  def props(message: String, printerActor: ActorRef): Props = Props(new Greeter(message, printerActor))
  final case class RequestActor(who: String)
  case object Greet
}

class Greeter(message: String, printerActor: ActorRef) extends Actor {
  import Greeter._
  import Printer._

  var greeting = ""

  def receive = {
    case RequestActor(who) =>

      val get = new HttpGet("http://www.google.com")
      val response = (new DefaultHttpClient).execute(get)
//      val responseString = EntityUtils.toString(response.getEntity, "UTF-8")
//      System.out.println(responseString)

      greeting = String.valueOf(response.getStatusLine.getStatusCode)
      println("message is "+message)
//      greeting = message + ", " + who
    case Greet           =>
      printerActor ! Greeting(greeting)
  }
}

object Printer {
  def props: Props = Props[Printer]
  final case class Greeting(greeting: String)
}

class Printer extends Actor with ActorLogging {
  import Printer._

  def receive = {
    case Greeting(greeting) =>
      log.info("Greeting received (from " + sender() + "): " + greeting)
  }
}

object AkkaQuickstart extends App {
  import Greeter._
  // Create the 'helloAkka' actor system
  val system: ActorSystem = ActorSystem("helloAkka")

  // Create the printer actor,this is also the target actor
  val printer: ActorRef = system.actorOf(Printer.props, "printerActor")

  implicit val materializer = ActorMaterializer.create(system)

  val throttler: ActorRef =
    Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
      .throttle(3, 1.second)
      .to(Sink.actorRef(printer, NotUsed))
      .run()

  //Create a new actor for each request thread
  for( a <- 1 to 10000){

    // Create the 'greeter' actors
    val howdyGreeter: ActorRef =
      system.actorOf(Greeter.props(String.valueOf(a), printer), String.valueOf(a))

    howdyGreeter ! RequestActor("RequestActor")
    howdyGreeter ! Greet
  }

}

标签: scalaakka

解决方案


Actor 不能影响其他 Actor 的行为,尤其是它无法控制谁将消息放入其邮箱以及何时 — 这就是 Actor 模型的工作方式。演员只能决定如何处理在其邮箱中找到的消息,并且对此拥有完全控制权。例如,它可以删除它们、发回错误回复、缓冲它们等。

如果你想要节流和背压,我建议这部分完全不要使用 Actors,而只使用 Akka Streams。生成请求消息的代码应该是 Source,而不是 for 循环。哪个源最合适完全取决于您的实际用例,例如从严格的集合创建一个流,Source.from()或者从数据结构中异步拉出新元素Source.unfoldAsync以及更多。这样做可以确保根据下游容量或速率限制,仅在时间合适时才发出请求。


推荐阅读