java - 提交 jar 文件时,控制台中未打印来自 Kafka 的数据。(Spark 流 + Kafka 集成 3.1.1)
问题描述
我提交jar文件时没有错误。
但是当我使用 HTTP 协议发送数据时没有打印数据。
(当我使用“kafka-console-consumer.sh”检查时,数据打印得很好)
【图,提交了一个jar文件:数据没有打印出来】
【图,Kafka-console-consumer.sh:数据打印出来】
命令 :
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --group test-consumer --topic test01 --from-beginning
[Java 文件]
2-1、依赖
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.1.1</version>
</dependency>
</dependencies>
2-2、代码
package SparkTest.SparkStreaming;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
public final class JavaWordCount {
public static void main(String[] args) throws Exception {
// Create a local StreamingContext with two working thread and batch interval of 1 second
SparkConf conf = new SparkConf().setMaster("yarn").setAppName("JavaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
// load a topic from broker
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "test-consumer");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("test01");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferBrokers(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
JavaDStream<String> data = stream.map(v -> {
return v.value(); // mapping to convert into spark D-Stream
});
data.print();
jssc.start();
jssc.awaitTermination();
}
}
解决方案
您--from-beginning
在控制台使用者中使用,但auto.offset.reset=latest
在 Spark 代码中使用。
因此,如果您想查看任何数据,则需要在 Spark运行时运行生产者
您还需要考虑改用spark-sql-kafka-0-10
结构化流依赖项,如 KafkaWordCount 示例中所示
推荐阅读
- python - 删除 xml 文件的第一部分,无法序列化
- javascript - 如何让 putImageData 覆盖整个画布
- java - onPause() 没有意图,onResume() 没有 onCreate()
- python - 使用替换函数时带有撇号的 Python 问题
- javascript - 将库导入到 vue.js 的单个文件组件中
- r - 查找数据框中出现次数最多的列
- php - org.json.JSONException:java.lang.String 类型的值 DB 无法转换为 JSONObject
- javascript - 涉及数组的for循环问题
- json - 尝试解析带引号的 JSON 响应时出错
- php - PHP输入类型“datetime-local”:在编辑模式下不显示值