apache-kafka - Kafka Streams 内部主题重定向
问题描述
目前正在使用 Kafka Streams 来聚合客户端系统中的事件。当使用假事件运行我们的原型时,一切正常。但是,在使用实际数据时,我们注意到在聚合过程中,Streams 会自动创建内部主题。虽然理论上这很好,但我们的客户有必要的、超级严格的安全性,并且不愿意授予我的开发团队主题创建权限。这意味着我们不能按原样运行我们的 Streams 程序。
但是,我们可以为我们创建主题并使用这些主题而不是 Streams 创建自己的 Kafka 主题。是否有可能/如何开始重定向 Streams 内部主题创建以利用现有主题?
注意:我们可以随意命名内部主题。它只需要由拥有这些特权的团队创建。
解决方案
在 Kafka Streams 中,现在 KStream 和 KTable 都有重载方法,它们接受一个新的参数 Named。通过使用命名类 DSL,用户可以为其拓扑中的处理器提供有意义的名称。
KStream<String,String> stream =
builder.stream("input", Consumed.as("Customer_transactions_input_topic"));
stream.filter((k,v) -> !v.equals("invalid_txn"), Named.as("filter_out_invalid_txns"))
.mapValues((v) -> v.substring(0,5), Named.as("Map_values_to_first_6_characters"))
.to("output", Produced.as("Mapped_transactions_output_topic"));
Topologies:
Sub-topology: 0
Source: Customer_transactions_input_topic (topics: [input])
--> filter_out_invalid_txns
Processor: filter_out_invalid_txns (stores: [])
--> Map_values_to_first_6_characters
<-- Customer_transactions_input_topic
Processor: Map_values_to_first_6_characters (stores: [])
--> Mapped_transactions_output_topic
<-- filter_out_invalid_txns
Sink: Mapped_transactions_output_topic (topic: output)
<-- Map_values_to_first_6_characters
现在,看看你的拓扑结构,所有处理器命名为:
现在您可以查看拓扑描述并轻松了解每个处理器在拓扑中所扮演的角色。但是,当您在 Kafka Streams 应用程序、状态存储、更改日志主题和重新分区主题的重新启动之间保留有状态操作符时,命名您的处理器节点还有另一个原因,这与使用生成名称的处理器节点的潜在名称转换有关.
推荐阅读
- laravel - Laravel 的 belongsTo 方法返回 null
- flask - 如何配置 .hosts 文件以便其他设备可以访问它
- reactjs - 如果我使用自定义组件而不是普通的 HTML5 输入标签,使用 react-hook-form 的 ref 会在控制台中引发错误
- c++ - 为什么用c而不是c ++编写的postgres?c 有更好的性能还是只是历史原因?
- javascript - NgClass Error Angular(不能绑定到'ngClass',因为它不是'a'的已知属性)
- javascript - 1 次调用后阻止 Javascript 函数
- javascript - 如何使用 AJAX 请求将用户在类变量中的 HTML5 位置从 JavaScript 传递到 Rails 控制器?
- php - MS Graph php 创建联系人
- c# - C# SSL/TLS 客户端/服务器应用程序远程证书链错误
- javascript - 如何使javascript滑块自动滑动