scala - 无法在 Akka Stream 中使用 GraphStage 类运行 SourceShape
问题描述
我正在尝试使用 GraphStage 构造创建一个 Redis Akka 流源。这个想法是,每当我从 subscribe 方法获得更新时,我都会将其推送到下一个组件。此外,如果没有拉动信号,则组件应背压。这是代码:
class SubscriberSourceShape(channel: String, subscriber: Subscriber)
extends GraphStage[SourceShape[String]] {
private val outlet: Outlet[String] = Outlet("SubscriberSource.Out")
override def shape: SourceShape[String] = SourceShape(outlet)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
val callback = getAsyncCallback((msg: String) => push(outlet, msg)
val handler = (msg: String) => callback.invoke(msg)
override def preStart(): Unit = subscriber.subscribe(channel)(handler)
}
}
}
然而,当我用一个简单的接收器运行它时,我得到了这个错误:
Error in stage [akka.http.impl.engine.server.HttpServerBluePrint$ProtocolSwitchStage@58344854]: No handler defined in stage [...SubscriberSourceShape@6a9193dd] for out port [RedisSubscriberSource.Out(1414886352). All inlets and outlets must be assigned a handler with setHandler in the constructor of your graph stage logic.
怎么了?
解决方案
您收到此错误是因为您尚未为 Source 设置任何输出处理程序,因此当下游组件(Flow 或 Sink)向此 Source 发送拉信号时,没有处理程序来处理拉信号。
您可以添加一个 OutputHandler 来消除错误。将 onPull 方法留空,因为您在 asyncCallback 上生成元素。只需将其添加到 GraphStageLogic 正文中:
setHandler(outlet, new OutHandler {
override def onPull(): Unit = {}
})
推荐阅读
- wordpress - Wordpress 管理员管理员更改按钮文本添加新
- html - CSS 图像和伪元素过渡不同步
- r - 如何修复 bsts 包 R 中的“.ValidateHolidayList(holiday.list) 中的错误”?
- apache-kafka-streams - Kafka Streams:使用 at_least_once 时,为状态存储更改日志主题提供了哪些排序保证?
- sql - 使用 Vertica 插入 SQL 表时出现资源不足错误
- typescript - Typescript - 将接口属性转换为通用
- c# - 如何一起创建平移和倾斜控件?
- office-js - Office 加载项要求
- javascript - XMLHttpRequest onload() 函数未传递 XMLHttpRequest 对象
- javascript - 如何将对象发送到类参数javascript?