首页 > 解决方案 > Spark Scala:找不到数据源:kafka

问题描述

我正在尝试使用sparkByExamples.com中的示例。但由于某种原因,spark 程序不会从 kafka 主题中读取数据。

代码在这里

错误信息如下:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:666)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:194)
    at com.sparkbyexamples.spark.streaming.kafka.json.SparkStreamingConsumerKafkaJson$.main(SparkStreaming.scala:22)
    at com.sparkbyexamples.spark.streaming.kafka.json.SparkStreamingConsumerKafkaJson.main(SparkStreaming.scala)

Process finished with exit code 1

基本上,我有一个 kafka topic1 < data.json 并从头开始阅读。

我最近开始了 spark + scala + kafka 活动,非常感谢您的帮助。

pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">


    <groupId>com.sparkbyexamples</groupId>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>spark-scala-examples</artifactId>

    <version>1.0-SNAPSHOT</version>
    <inceptionYear>2008</inceptionYear>
    <packaging>jar</packaging>
    <properties>
        <scala.version>2.12.12</scala.version>
        <spark.version>3.0.0</spark.version>
    </properties>

    <repositories>
        <repository>
            <id>scala-tools.org</id>
            <name>Scala-Tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </repository>
    </repositories>

    <pluginRepositories>
        <pluginRepository>
            <id>scala-tools.org</id>
            <name>Scala-Tools Maven2 Repository</name>
            <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
    </pluginRepositories>

    <dependencies>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>org.specs</groupId>
            <artifactId>specs</artifactId>
            <version>1.2.5</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>com.thoughtworks.xstream</groupId>
            <artifactId>xstream</artifactId>
            <version>1.4.11</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>${spark.version}</version>
            <scope>compile</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>${spark.version}</version>
         <scope>compile</scope>
        </dependency>

        <dependency>
           <groupId>com.databricks</groupId>
           <artifactId>spark-xml_2.11</artifactId>
           <version>0.4.1</version>
       </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-avro_2.12</artifactId>
            <version>${spark.version}</version>
        </dependency>

    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <resources><resource><directory>src/main/resources</directory></resource></resources>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <args>
                        <arg>-target:jvm-1.5</arg>
                    </args>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-eclipse-plugin</artifactId>
                <configuration>
                    <downloadSources>true</downloadSources>
                    <buildcommands>
                        <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
                    </buildcommands>
                    <additionalProjectnatures>
                        <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
                    </additionalProjectnatures>
                    <classpathContainers>
                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                        <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
                    </classpathContainers>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <reporting>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                </configuration>
            </plugin>
        </plugins>
    </reporting>
</project>

标签: scalaapache-sparkapache-kafkaspark-kafka-integration

解决方案


如错误所示,您缺少spark-sql-kafka-0-10依赖项

参考文档 - https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html


不相关,您的 spark-xml Scala 版本需要为 2.12


推荐阅读