首页 > 解决方案 > Kafka Streams 内部主题重定向

问题描述

目前正在使用 Kafka Streams 来聚合客户端系统中的事件。当使用假事件运行我们的原型时,一切正常。但是,在使用实际数据时,我们注意到在聚合过程中,Streams 会自动创建内部主题。虽然理论上这很好,但我们的客户有必要的、超级严格的安全性,并且不愿意授予我的开发团队主题创建权限。这意味着我们不能按原样运行我们的 Streams 程序。

但是,我们可以为我们创建主题并使用这些主题而不是 Streams 创建自己的 Kafka 主题。是否有可能/如何开始重定向 Streams 内部主题创建以利用现有主题?

注意:我们可以随意命名内部主题。它只需要由拥有这些特权的团队创建。

标签: apache-kafkaapache-kafka-streamskafka-topic

解决方案


在 Kafka Streams 中,现在 KStream 和 KTable 都有重载方法,它们接受一个新的参数 Named。通过使用命名类 DSL,用户可以为其拓扑中的处理器提供有意义的名称。

KStream<String,String> stream =
builder.stream("input", Consumed.as("Customer_transactions_input_topic"));
stream.filter((k,v) -> !v.equals("invalid_txn"), Named.as("filter_out_invalid_txns"))
      .mapValues((v) -> v.substring(0,5), Named.as("Map_values_to_first_6_characters"))
      .to("output", Produced.as("Mapped_transactions_output_topic"));
Topologies:
  Sub-topology: 0
   Source: Customer_transactions_input_topic (topics: [input])
     --> filter_out_invalid_txns
   Processor: filter_out_invalid_txns (stores: [])
     --> Map_values_to_first_6_characters
     <-- Customer_transactions_input_topic
   Processor: Map_values_to_first_6_characters (stores: [])
     --> Mapped_transactions_output_topic
     <-- filter_out_invalid_txns
   Sink: Mapped_transactions_output_topic (topic: output)
     <-- Map_values_to_first_6_characters

现在,看看你的拓扑结构,所有处理器命名为:

现在您可以查看拓扑描述并轻松了解每个处理器在拓扑中所扮演的角色。但是,当您在 Kafka Streams 应用程序、状态存储、更改日志主题和重新分区主题的重新启动之间保留有状态操作符时,命名您的处理器节点还有另一个原因,这与使用生成名称的处理器节点的潜在名称转换有关.


推荐阅读