java - Apache kafka - 从 java 应用程序打印到控制台
问题描述
我正在运行 ubuntu 18 并使用 java (intellij IDE) 并编写一个基本的 kafka 应用程序。我正在尝试使用此处的基本示例,并将信息流式传输到应用程序,然后在屏幕上打印一些内容,我正在使用 intellij“运行”命令运行应用程序。
当我将应用程序连接到输出流时,它工作正常并且我设法将信息输出到终端。
我尝试System.out.println()
在 foreach 方法中添加,在 apply 方法中,它不起作用,我在其中添加断点并运行调试模式,但它没有到达那里,我猜流程在运行期间没有到达那里。
我正在使用正确的主题将信息流式传输到应用程序,并且我在应用程序和 foreach 之外放入应用程序的打印工作正常。
每次向应用程序流式传输信息时,如何让应用程序打印一些内容?主要思想是处理某些东西并将结果打印到监视器而不是kafka流
这是代码:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.kstream.Printed;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
public class main {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
// Note: To re-run the demo, you need to use the offset reset tool:
// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Tool
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.foreach(new ForeachAction<String, String>() {
@Override
public void apply(String key, String value) {
System.out.println("yeah");
}
});
KTable<String, Long> counts = source
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
}
})
.groupBy(new KeyValueMapper<String, String, String>() {
@Override
public String apply(String key, String value) {
System.out.println("what");
return value;
}
})
.count();
//System.exit(0);
// need to override value serde to Long type
System.out.println("what");
//counts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
final CountDownLatch latch = new CountDownLatch(1);
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});
try {
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(2);
}
System.exit(0);
}
}
解决方案
KafkaStreams 提供输出控制台打印 dsl
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.print(Printed.toSysOut());
KTable<String, Long> counts = source
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
...
测试结果
./kafka-console-producer --broker-list localhost:9092 --topic streams-plaintext-input
input1
input2
Intellij 控制台结果
[KSTREAM-SOURCE-0000000000]: null, input1
[KSTREAM-SOURCE-0000000000]: null, input2
推荐阅读
- python-jira - python-jira 集成未能显示项目中问题的史诗
- android - 使用 AWS Amplify API 时的 Android java.lang.NoSuchFieldError
- c - 为什么即使分配给相同类型的另一个结构,该结构仍保持为 NULL?
- ios - 发布后反应原生iOS构建失败
- ubuntu - Grafana 图像渲染器插件无法安装 - 签名已被修改
- c# - 如何检查给定地址是 32 位还是 64 位?
- reactjs - 无法在 React Native 中使用 Scrollview 滚动
- python - Flask 应用程序在使用 Alpine、nginx 和 uwsgi 时出现 ModuleNotFoundError .. 但使用 Flask 运行它可以正常工作
- unit-testing - 如何对修改发送消息的 net.Conn 函数进行单元测试?
- django - Django 3.2.8 自定义中间件返回 302 重定向错误