首页 > 解决方案 > 尝试提取 GraphStageLogic 以进行自定义有状态实现,然后将其作为参数传递给 GraphStage 会给出异常

问题描述

下面是简化的代码片段,其中 GraphStateLogic 实现作为构造函数参数传递给 GraphStage:-

package akka.shapes.examples.notworking

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler}

//This is base graph stage, where GraphStageLogic and SinkShape are passed in constructor parameter
class BaseGraphStage[T](val shape: SinkShape[T], graphStageLogic: GraphStageLogic) extends GraphStage[ SinkShape[T] ] {

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = graphStageLogic
}

//this is a sample stateful extension of GraphStageLogic, that accepts first ten elements only
class CountLogic(sinkShape: SinkShape[Int], maxValue: Int) extends GraphStageLogic(sinkShape) {
  var counter: Long = 0

  override def preStart(): Unit = {
    pull(sinkShape.in)
  }

  setHandler(sinkShape.in, new InHandler {
    override def onPush(): Unit = {
      val e = grab(sinkShape.in)
      println("conditional sink : " + e)
      counter = counter + 1
      counter == maxValue match {
        case true => completeStage()
        case false => pull(sinkShape.in)
      }
    }
  })
}


object SampleSinkNotWorking {

  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem("NotWroking")
    implicit val actorMaterializer = ActorMaterializer()

    val inlet = Inlet[Int](name = "sampleInlet")
    val sinkShape = SinkShape( inlet )
    val countGraphStateLogic = new CountLogic(sinkShape, 10)

    val sinkGraphStage = new BaseGraphStage[Int](sinkShape, countGraphStateLogic)
    val sink = Sink.fromGraph( sinkGraphStage )

    val graph = GraphDSL.create() { implicit builder =>

      import GraphDSL.Implicits._

      Source(1 to 100) ~> sink

      ClosedShape
    }

    val runnableGraph = RunnableGraph.fromGraph(graph)

    runnableGraph.run()
  }
}

运行上面的代码会给出 ArrayIndexOutOfBoundsException :-

线程“主”中的异常 java.lang.ArrayIndexOutOfBoundsException: -1 at akka.stream.stage.GraphStageLogic.setHandler(GraphStage.scala:439) at akka.shapes.examples.notworking.CountLogic.(SampleSinkNotWorking.scala:24) at akka.shapes.examples.notworking.SampleSinkNotWorking$.main(SampleSinkNotWorking.scala:46) 在 akka.shapes.examples.notworking.SampleSinkNotWorking.main(SampleSinkNotWorking.scala)

我试过调试,看起来,InLet id 是-1,它没有被重置。

在此处输入图像描述

但是,当 GraphStateLogic 作为构造函数参数传递给 GraphState 时,为什么它没有被重置?

标签: scalaexceptionakkaakka-stream

解决方案


我有点重构你的代码,问题消失了,看看:


class BaseGraphStage(maxValue: Int) extends GraphStage[SinkShape[Int]] {

  val inlet = Inlet[Int](name = "sampleInlet")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with StageLogging {
      var counter: Int = 0

      setHandler(inlet, new InHandler {
        override def onPush(): Unit = {
          val e = grab(inlet)
          log.info(s"$e is consumed")
          counter += 1
          if (counter == maxValue) {
            completeStage()
          } else {
            pull(inlet)
          }
        }
      })

      override def preStart(): Unit =
        pull(inlet)

      override def postStop(): Unit =
        counter = 0
    }

  override def shape: SinkShape[Int] = SinkShape(inlet)
}

object SampleSinkNotWorking {

  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem("NotWorking")
    implicit val actorMaterializer = ActorMaterializer()


    val sink = Sink.fromGraph(new BaseGraphStage(10))

    Source(1 to 100).runWith(sink)
  }
}

无法完全回答您的最后一个问题,但我认为所有技巧都是在图形阶段的上下文中创建入口而不是在此之外,并使用前处理程序和后处理程序。希望有帮助。


推荐阅读