scala - 只有在可以转换的情况下,如何转换 Kafka Stream 事件并将其发送到另一个主题
问题描述
我想构建一个简单的 Kafka 流,尝试根据某些条件转换事件。如果可以转换事件,则转换后的事件会进入不同的主题。如果无法转换事件,则将其再次存储在同一主题中以供将来尝试。
假设我有这个:
case class Foo(a: String, b: String, c: Boolean)
def translate(value: String): Option[Foo] = {
// ...
// Returns an Option of Foo
}
所以我需要有这样的东西:
val builder: StreamsBuilder = new StreamsBuilder()
builder
.stream(topic)
.map[String, String]((key, value) => translate(value))
// If translate(value) is Some(value) send the value to a topic
// Otherwise, send the original value (without being transformed) to the same topic
我完全被这个问题所困扰。我遇到的最接近的事情是尝试使用布尔值创建一个结构,告诉我事件是否可以转换,然后使用.branch
. 例如,像这样:
def translate(value: String): (Boolean, Option[CPCTTMDataTransformed]) = {
val eventTransformed = transform(value)
eventTransformed match {
case Some(value) => (true, Option(value))
case None => (false, None)
}
}
然后尝试做这样的事情:
builder
.stream(topic)
.map[String, (Boolean, Option[Foo])]((key, value) => translate(value))
.branch(
(_, element) => element._1,
)
.foreach {
// Send the "true" to one topic and in the "false", send the original message to the original topic
}
但当然,我需要有原始事件才能将其发送到主题。
我虽然有更复杂的结构,但最后我总是回到基于Some
-None
条件分支流的问题。
解决方案
也许使用处理器 API。你有一个Processor
翻译,如果翻译成功,你context.forward(To.child("translated"))
就没有context.forward(To.child("retry"))
。
您Topology
手动将您的插头连接在一起:
Topology topology = new Topology();
topology.addSource("source", topic);
topology.addProcessor("translator", () -> new TranslateProcessor(), "source");
topology.addSink("translated", resultTopic, "translator");
topology.addSink("retry", topic, "translator");
推荐阅读
- java - 实施 Bcrypt PasswordEncoder 后无法使用身份验证
- mysql - 当用户名包含“%”时,无法通过 mysql-workbench 登录 mysql
- mips - MIPS跳转和BNE地址计算
- azure-devops - 如何使用 Azure DevOps/VSTS 和 Ansible playbook 预配环境
- c++ - 如何为以下星形图案添加空间?
- maven - 运行 mvn install 时构建兄弟项目
- php - 如何在 Typo3 9 中检查应用程序上下文?
- php - 代码中的 trans 函数是什么意思……以及 quickadmin 类的位置
- java - JAVA RMI 加法示例
- javascript - 如何从这些属性中构造一个新对象?