java - 如何使用 KafkaStream 使用主题中的记录..?
问题描述
KafkaStream 如何使用来自主题的消息。
以下是我的代码:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_URL + ":" + KAFKA_SERVER_PORT);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KafkaStreams streams = new KafkaStreams(builder.build(), props);
builder.stream(topic_name).print(null);
streams.start();
解决方案
Kafka Streams DSL(领域特定语言)构建在 Streams Processor API 之上。
它使用底层处理器 API 和底层实现来读取来自 kafka 主题的消息。这是详细的架构:
https://kafka.apache.org/20/documentation/streams/architecture
Streams DSL 建立在处理器 API 之上。如果您深入了解处理器 API,您可以了解如何实现功能并且可以通过一行代码轻松调用:
https://kafka.apache.org/20/documentation/streams/developer-guide/processor-api.html
这就是 Stream DSL 操作的工作方式。在使用 Streams DSL 编写 KStream 应用程序时,大多数操作可以在几行代码中调用,但在它下面有完整的实现
最初,每个操作都转换为ProcessorNode。所以从一个主题读取被转换为SourceNode并且写入一个主题是SinkNode。并且所有节点都按顺序添加到拓扑中。
您可以在 StreamsBuilder 和 StreamTask 的源代码中查看更多详细信息。它将让您了解如何构建和运行拓扑:
下面是 Wordcount 的 KStream 应用程序示例。假设“wordcount-input”是输入主题,“wordcount-output”是输出主题:
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcount-lambda-example-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // add if you want to reset the offset to earliest for each run
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
final StreamsBuilder builder = new StreamsBuilder();
final KStream<String, String> textLines = builder.stream("wordcount-input");
final Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
final KTable<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count();
// Write the `KTable<String, Long>` to the output topic.
wordCounts.toStream().to("wordcount-output", Produced.with(stringSerde, longSerde));
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.cleanUp();
streams.start();
推荐阅读
- c++ - 在左下角 Windows 凭据提供程序中显示用户列表
- java - 如何使用 Java 8 使用 A 的构造函数将对象 A 的数组转换为对象 B 的数组?
- c++ - 我无法下载 minGW 包
- discord - 如何检查我的不和谐机器人是否已经对一个人进行了 dm 并且它不会再次对该人进行 dm (discord.py)
- kubernetes - kubernetes 入口服务注解
- firebase - 颤振:sharedpreference 检索空值
- powershell - 使用 DisplayNames 从文本文件中获取 SamAccountName
- kubernetes - 在 kubernetes java 客户端中监听 pod 失败?
- metatrader4 - 突破指标警报:布林带从 Donchian Channel 内部突破
- ffmpeg - ffmpeg:用于过滤器比较的滑动覆盖