首页 > 解决方案 > akka timer 提供消息 Cancel timer [Message] with generation [583]

问题描述

我正在使用akka计时器

我正在使用twitter 流,我试图在 5 秒内获得推文的数量这是我的代码

case class PerSecond(tweet:Tweet)
case class TweetPerSecondCount(tweet:Tweet)

class TweetPerSecondActor extends Actor with Timers{
var counter=0
 def receive: PartialFunction[Any, Unit] = {
    case PerSecond(tweet) =>
      log.info("Actor TweetPerSecondActor recevied the message PerSecond")
      timers.startPeriodicTimer("perSecond", TweetPerSecondCount(tweet), 5.second)

    case TweetPerSecondCount(tweet) =>
      log.info("Actor TweetPerSecondActor recevied the message TweetPerSecondCount")
      log.info("got the tweet {}",getCounter+"in 5 seconds")

    case message =>
      log.warn("Actor TweetPerSecondActor: Unhandled message received : {}", message)
      unhandled(message)
  }
}

在播放框架的控制器动作中,我从推特流中获取推文对象(连续流而不弯腰)

    class Mycontroller extends Controller {

    val tweetPerSecondActor = system.actorOf......//create actor


    def tweetAveragePerSecond = Action {
        log.debug("in the action tweetAveragePerSecond")

        def getTweet: PartialFunction[StreamingMessage, Unit] = {
          case tweet: Tweet =>
            val future = ask(tweetPerSecondActor, PerSecond(tweet))
        }
        val streaming: Future[TwitterStream] = handleTwitterStreamClient.getStreamingCLient.sampleStatuses(stall_warnings = true)(getTweet)

        Ok("tweet average per second")
      }
    }

当我到达路线时,日志显示

TweetPerSecondActor INFO - Actor TweetPerSecondActor recevied the message PerSecond
16:42:44.335 28939 [ArteciateActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Cancel timer [perSecond] with generation [758]
16:42:44.335 28939 [ArteciateActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Start timer [perSecond] with generation [759]

    6:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-6] TweetPerSecondActor INFO - Actor TweetPerSecondActor recevied the message PerSecond
    16:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Cancel timer [perSecond] with generation [758]
    16:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Start timer [perSecond] with generation [759]

如果我传递虚拟字符串值而不是运行流并传递 twitter 对象,则计时器可以正常工作,例如 case class PerSecond(tweet:String) case class TweetPerSecondCount(tweet:String)

class TweetPerSecondActor extends Actor with Timers{
var counter=0
 def receive: PartialFunction[Any, Unit] = {
    case PerSecond(tweet) =>
      log.info("Actor TweetPerSecondActor recevied the message PerSecond")
      timers.startPeriodicTimer("perSecond", TweetPerSecondCount(tweet), 5.second)

    case TweetPerSecondCount(tweet) =>
      log.info("Actor TweetPerSecondActor recevied the message TweetPerSecondCount")
      log.info("got the tweet {}",getCounter+"in 5 seconds")

    case message =>
      log.warn("Actor TweetPerSecondActor: Unhandled message received : {}", message)
      unhandled(message)
  }
}

    class Mycontroller extends Controller {

    val tweetPerSecondActor = system.actorOf......//create actor


    def tweetAveragePerSecond = Action {
        log.debug("in the action tweetAveragePerSecond")

            val future = ask(tweetPerSecondActor, PerSecond("dummy value"))


        Ok("tweet average per second")
      }
    }

标签: scalatimerakkatwitter4jtwitter4s

解决方案


Akka 中的计时器是键控的(在您的情况下键是"perSecond"),并且 API 保证

每个计时器都有一个键,如果启动具有相同键的新计时器,则取消前一个计时器

所以每次getTweet调用函数(考虑到它是为了效果而不是价值而调用的,我更喜欢“过程”,但这可能是特殊的)被调用时,计时器将被取消。

根据您要完成的任务,解决方案可能包括:

  • 每个请求的演员。您需要让请求返回一些可以传递给未来请求以从参与者那里获取信息的内容。Actor 生命周期管理也可能是可取的。
  • 为整个控制器保留一个参与者,但每个请求使用唯一的计时器键。如果这些是周期性计时器,您将必须跟踪哪些计时器键处于活动状态并取消不再需要的那些(保持任意多个周期性计时器运行可能最终会降低性能)

推荐阅读