scala - 限制时间间隔内发送的消息数
问题描述
使用下面的代码,我试图限制在指定时间范围内发送给参与者的消息数量。但是消息没有受到限制,并且正在尽快发送。下游参与者只是向 Google 主页发出 http 请求。
我尝试限制在 3 秒内发送 3 条消息的节流器代码:
val throttler: ActorRef =
Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
.throttle(3, 1.second)
.to(Sink.actorRef(printer, NotUsed))
.run()
如何限制循环内发送的消息数量:
for( a <- 1 to 10000){
// Create the 'greeter' actors
val howdyGreeter: ActorRef =
system.actorOf(Greeter.props(String.valueOf(a), printer), String.valueOf(a))
howdyGreeter ! RequestActor("RequestActor")
howdyGreeter ! Greet
}
每秒 3 次?
整个代码:
//https://developer.lightbend.com/guides/akka-quickstart-scala/full-example.html
import akka.NotUsed
import akka.stream.{OverflowStrategy, ThrottleMode}
import akka.stream.scaladsl.{Sink, Source}
import org.apache.http.client.methods.HttpGet
import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.DefaultHttpClient
import net.liftweb.json._
import net.liftweb.json.Serialization.write
import org.apache.http.util.EntityUtils
//import akka.contrib.throttle.TimerBasedThrottler
import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import scala.concurrent.duration._
import akka.NotUsed
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.OverflowStrategy
import akka.stream.ThrottleMode
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
object Greeter {
def props(message: String, printerActor: ActorRef): Props = Props(new Greeter(message, printerActor))
final case class RequestActor(who: String)
case object Greet
}
class Greeter(message: String, printerActor: ActorRef) extends Actor {
import Greeter._
import Printer._
var greeting = ""
def receive = {
case RequestActor(who) =>
val get = new HttpGet("http://www.google.com")
val response = (new DefaultHttpClient).execute(get)
// val responseString = EntityUtils.toString(response.getEntity, "UTF-8")
// System.out.println(responseString)
greeting = String.valueOf(response.getStatusLine.getStatusCode)
println("message is "+message)
// greeting = message + ", " + who
case Greet =>
printerActor ! Greeting(greeting)
}
}
object Printer {
def props: Props = Props[Printer]
final case class Greeting(greeting: String)
}
class Printer extends Actor with ActorLogging {
import Printer._
def receive = {
case Greeting(greeting) =>
log.info("Greeting received (from " + sender() + "): " + greeting)
}
}
object AkkaQuickstart extends App {
import Greeter._
// Create the 'helloAkka' actor system
val system: ActorSystem = ActorSystem("helloAkka")
// Create the printer actor,this is also the target actor
val printer: ActorRef = system.actorOf(Printer.props, "printerActor")
implicit val materializer = ActorMaterializer.create(system)
val throttler: ActorRef =
Source.actorRef(bufferSize = 1000, OverflowStrategy.dropNew)
.throttle(3, 1.second)
.to(Sink.actorRef(printer, NotUsed))
.run()
//Create a new actor for each request thread
for( a <- 1 to 10000){
// Create the 'greeter' actors
val howdyGreeter: ActorRef =
system.actorOf(Greeter.props(String.valueOf(a), printer), String.valueOf(a))
howdyGreeter ! RequestActor("RequestActor")
howdyGreeter ! Greet
}
}
解决方案
Actor 不能影响其他 Actor 的行为,尤其是它无法控制谁将消息放入其邮箱以及何时 — 这就是 Actor 模型的工作方式。演员只能决定如何处理在其邮箱中找到的消息,并且对此拥有完全控制权。例如,它可以删除它们、发回错误回复、缓冲它们等。
如果你想要节流和背压,我建议这部分完全不要使用 Actors,而只使用 Akka Streams。生成请求消息的代码应该是 Source,而不是 for 循环。哪个源最合适完全取决于您的实际用例,例如从严格的集合创建一个流,Source.from()
或者从数据结构中异步拉出新元素Source.unfoldAsync
以及更多。这样做可以确保根据下游容量或速率限制,仅在时间合适时才发出请求。
推荐阅读
- python - 在装饰器类的实例化范围之外应用函数装饰器?
- node.js - 邀请未注册用户使用我的应用程序的最安全方法是什么?
- swift - 在 GitHub Actions Ubuntu 中安装 Swift 时遇到问题
- .htaccess - 在 htaccess 中重定向到文件夹和 https
- python - 我需要 python 脚本来选择所有 AVI 文件
- python-3.x - 找出两个字符串中出现的单词数
- three.js - 三、以 px 为单位的 Sprite 大小,sizeAttenuation 为 false
- c# - Azure Web App 未使用连接字符串通过 VPN 连接到 On-Site SQL Server
- excel - 获取和转换中的高级代码编辑器
- python - 为小部件指定尺寸并在其中绘图