scala - 尝试提取 GraphStageLogic 以进行自定义有状态实现,然后将其作为参数传递给 GraphStage 会给出异常
问题描述
下面是简化的代码片段,其中 GraphStateLogic 实现作为构造函数参数传递给 GraphStage:-
package akka.shapes.examples.notworking
import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Sink, Source}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler}
//This is base graph stage, where GraphStageLogic and SinkShape are passed in constructor parameter
class BaseGraphStage[T](val shape: SinkShape[T], graphStageLogic: GraphStageLogic) extends GraphStage[ SinkShape[T] ] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = graphStageLogic
}
//this is a sample stateful extension of GraphStageLogic, that accepts first ten elements only
class CountLogic(sinkShape: SinkShape[Int], maxValue: Int) extends GraphStageLogic(sinkShape) {
var counter: Long = 0
override def preStart(): Unit = {
pull(sinkShape.in)
}
setHandler(sinkShape.in, new InHandler {
override def onPush(): Unit = {
val e = grab(sinkShape.in)
println("conditional sink : " + e)
counter = counter + 1
counter == maxValue match {
case true => completeStage()
case false => pull(sinkShape.in)
}
}
})
}
object SampleSinkNotWorking {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("NotWroking")
implicit val actorMaterializer = ActorMaterializer()
val inlet = Inlet[Int](name = "sampleInlet")
val sinkShape = SinkShape( inlet )
val countGraphStateLogic = new CountLogic(sinkShape, 10)
val sinkGraphStage = new BaseGraphStage[Int](sinkShape, countGraphStateLogic)
val sink = Sink.fromGraph( sinkGraphStage )
val graph = GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
Source(1 to 100) ~> sink
ClosedShape
}
val runnableGraph = RunnableGraph.fromGraph(graph)
runnableGraph.run()
}
}
运行上面的代码会给出 ArrayIndexOutOfBoundsException :-
线程“主”中的异常 java.lang.ArrayIndexOutOfBoundsException: -1 at akka.stream.stage.GraphStageLogic.setHandler(GraphStage.scala:439) at akka.shapes.examples.notworking.CountLogic.(SampleSinkNotWorking.scala:24) at akka.shapes.examples.notworking.SampleSinkNotWorking$.main(SampleSinkNotWorking.scala:46) 在 akka.shapes.examples.notworking.SampleSinkNotWorking.main(SampleSinkNotWorking.scala)
我试过调试,看起来,InLet id 是-1,它没有被重置。
但是,当 GraphStateLogic 作为构造函数参数传递给 GraphState 时,为什么它没有被重置?
解决方案
我有点重构你的代码,问题消失了,看看:
class BaseGraphStage(maxValue: Int) extends GraphStage[SinkShape[Int]] {
val inlet = Inlet[Int](name = "sampleInlet")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with StageLogging {
var counter: Int = 0
setHandler(inlet, new InHandler {
override def onPush(): Unit = {
val e = grab(inlet)
log.info(s"$e is consumed")
counter += 1
if (counter == maxValue) {
completeStage()
} else {
pull(inlet)
}
}
})
override def preStart(): Unit =
pull(inlet)
override def postStop(): Unit =
counter = 0
}
override def shape: SinkShape[Int] = SinkShape(inlet)
}
object SampleSinkNotWorking {
def main(args: Array[String]): Unit = {
implicit val actorSystem = ActorSystem("NotWorking")
implicit val actorMaterializer = ActorMaterializer()
val sink = Sink.fromGraph(new BaseGraphStage(10))
Source(1 to 100).runWith(sink)
}
}
无法完全回答您的最后一个问题,但我认为所有技巧都是在图形阶段的上下文中创建入口而不是在此之外,并使用前处理程序和后处理程序。希望有帮助。
推荐阅读
- enums - 将 Nim 枚举转换为整数
- wso2 - WSO2 EMM:如何一次性注册多个 Android 设备?
- nativescript - nativescript paytm 插件给出 404 Not Found Error nginx/1.6.2
- javascript - 是否应该隐藏获取请求的链接?
- python - DFS遍历有什么问题?
- shell - shell:如何使txt文件中的西班牙语乱码正常显示
- c++ - 深入理解strcat和strlen函数
- html - 导航栏阻止内部链接的点击能力
- android - RxJava - ReplaySubject 只发出两次数据
- c - 避免在 makefile 中重复行