apache-spark - Apache Spark 与 Kafka 的集成
问题描述
我正在学习关于 Kafka 和 Spark 的 Udemy 课程,并且正在学习 Apache Spark 与 Kafka 的集成
下面是apache spark的代码
SparkSession session = SparkSession.builder().appName("KafkaConsumer").master("local[*]").getOrCreate();
session.sparkContext().setLogLevel("ERROR");
Dataset<Row> df = session
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "second_topic").load();
df.show();
以下是 pom.xml 文件的内容
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example.kafka.spark</groupId>
<artifactId>Kafka-Spark-Integration-Code</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<!-- <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency> -->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
</dependencies>
</project>
但是,当我运行代码时,我遇到了无法解决的错误。我在 MX Linux 上使用 openjdk 8 和 spark 3。谢谢
exception in thread "main" java.lang.ClassFormatError: Invalid code attribute name index 24977 in class file org/apache/spark/sql/execution/columnar/InMemoryRelation
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:83)
at org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:132)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:132)
at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:131)
at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:323)
at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1107)
at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:157)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:155)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:152)
at org.apache.spark.sql.streaming.DataStreamReader.<init>(DataStreamReader.scala:519)
at org.apache.spark.sql.SparkSession.readStream(SparkSession.scala:657)
at example.code.spark.kafka.KafkaSparkConsumer.main(KafkaSparkConsumer.java:19)
解决方案
您可以按照Structured Streaming + Kafka Integration Guide中给出的示例进行操作:
SparkSession session = SparkSession.builder()
.appName("KafkaConsumer")
.master("local[*]")
.getOrCreate();
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "second_topic")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
消费数据。Structured Streaming Programming Guide向您展示了如何将数据打印到控制台:
StreamingQuery query = df
.writeStream()
.format("console")
.outputMode("append")
.option("checkpointLocation", "path/to/checkpoint/dir")
.start();
query.awaitTermination();
推荐阅读
- python - 如何使用 KivyMD 的 BoxLayout
- pip - 将 pip3 与 3.7 一起使用?
- python - selenium.common.exceptions.WebDriverException:消息:newSession?
- laravel - 使用 renatomarinho / laravel-page-speed 包后 jQuery 不工作?
- php - 如果 WooCommerce 短代码没有返回任何产品,则显示一条消息
- javascript - 在 Angular 中更新组件数据时出现问题
- angular - Angular prerender i18n 混淆
- html - 在卡片上正确放置图像
- django - DJANGO PWA 安装到主屏幕不起作用
- sdl - SDL_DestroyWindow() 和 SDL_Quit() 的意义何在?