scala - 生成消息时,alpakka-kafka 连接器中使用的直通是什么?
问题描述
这是文档https://doc.akka.io/docs/alpakka-kafka/current/producer.html中给出的用于向 Kafka 生成单个消息的代码
val single: ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] =
ProducerMessage.single(
new ProducerRecord("topicName", key, value),
passThrough
)
您能否解释一下 passThrough 的用途?
解决方案
passThrough
is an additional value which comes to be available for use in ProducerMessage.Results
’s passThrough()
.
As the official documentation states, the best and the most widely used practice is to use this value as a transport to passing the CommittableOffset
to a downstream Committer.Sink
.
This seems convenient in cases when you are using a CommittableSource
along with the enable.auto.commit
Kafka consumer option set to false
. In such cases you have to commit Kafka offsets manually as you processing them. I in particular use such approach when reading records from one Kafka topic, processing them and then sending them to another Kafka topic.
Here is a simple use case:
Consumer
.committableSource(consumerSettings, Subscriptions.topics(topicIn))
.mapAsync { msg ⇒
process(msg.record.value).map {
response ⇒
ProducerMessage.single(
new ProducerRecord(topicOut, msg.record.key, response),
passThrough = msg.committableOffset
)
}
}
.via(Producer.flexiFlow(producerSettings))
.map(_.passThrough) // extract the passThrough
.toMat(Committer.sink(committerSettings))(Keep.both) // commiting offsets
.mapMaterializedValue(DrainingControl.apply)
.run()
推荐阅读
- c - 使用 fgets() 从终端写入文件文本
- javascript - 从数组中过滤数组列表
- kubernetes - 将ip添加到接口后无法访问kibana url(kubernetes)
- docusignapi - Docusign - 我们可以使用个人计划,并且能够获得外部用户对 SOBO 的同意吗?
- wordpress - 最初隐藏菜单。并且只出现在 wordpress-elementor 中的滚动处
- json - 从熊猫数据框创建一个json并发布到rest api
- linux - 通过 HDMI 使用音频而不通过 xrandr 显示
- javascript - 在 vuejs 中将文件发送到 django 服务器时的奇怪行为
- node-red - NodeRed 切换到设备
- windows - 尽管已安装并位于 bin 文件夹中,Cygwin 仍无法识别 make 命令