首页 > 解决方案 > MacOS Flink-1.8.0 WordCount.java 问题

问题描述

我是 Flink 的新手。

我从官方链接下载了 flink-1.8.0-bin-scala_2.11.tgz 并安装了 apache-maven-3.6.1-bin.tar.gz。

我已经使用命令行在我的 Mac 上成功运行了 Flink

./bin/start-cluster.sh

我上传了flink-1.8.0/examples/batch/WordCount.jar,运行成功。

我在 IntelliJ IDEA 中创建了一个项目来学习使用 Flink 编写代码。

WordCount.java 如下:

package com.panda;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;

public class WordCount {
    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataSet<String> text;
        if (params.has("input")) {
            // read the text file from given input path
            text = env.readTextFile(params.get("input"));
        } else {
            // get default test text data
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            text = WordCountData.getDefaultTextLineDataSet(env);
        }

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                    // group by the tuple field "0" and sum up tuple field "1"
                    .groupBy(0)
                    .sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsCsv(params.get("output"), "\n", " ");
            // execute program
            env.execute("WordCount Example");
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }

    }

// *************************************************************************
//     USER FUNCTIONS
// *************************************************************************

/**
 * Implements the string tokenizer that splits sentences into words as a user-defined
 * FlatMapFunction. The function takes a line (String) and splits it into
 * multiple pairs in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
 */
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
       public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

我右键单击“运行 WordCount.java”,它成功运行并显示结果。

但是,当我使用终端并输入包并点击

javac WordCount.java

它显示了几个这样的错误:

WordCount.java:3: 错误: 程序包org.apache.flink.api.common.functions不存在
import org.apache.flink.api.common.functions.FlatMapFunction;
                                            ^
WordCount.java:4: 错误: 程序包org.apache.flink.api.java不存在
import org.apache.flink.api.java.DataSet;
                                ^
WordCount.java:5: 错误: 程序包org.apache.flink.api.java不存在
import org.apache.flink.api.java.ExecutionEnvironment;
                                ^
WordCount.java:6: 错误: 程序包org.apache.flink.api.java.tuple不存在
import org.apache.flink.api.java.tuple.Tuple2;
                                      ^
WordCount.java:7: 错误: 程序包org.apache.flink.api.java.utils不存在
import org.apache.flink.api.java.utils.ParameterTool;
                                      ^
WordCount.java:8: 错误: 程序包org.apache.flink.examples.java.wordcount.util不存在(does not exist)
import org.apache.flink.examples.java.wordcount.util.WordCountData;
                                                    ^
WordCount.java:9: 错误: 程序包org.apache.flink.util不存在(does not exist)
import org.apache.flink.util.Collector;
                            ^
WordCount.java:63: 错误: 找不到符号
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
                                                   ^
  符号:   类 FlatMapFunction
  位置: 类 WordCount
WordCount.java:63: 错误: 找不到符号
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
                                                                           ^
  符号:   类 Tuple2
  位置: 类 WordCount
WordCount.java:66: 错误: 找不到符号
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
                                          ^
  符号:   类 Collector
  位置: 类 Tokenizer
WordCount.java:66: 错误: 找不到符号
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
                                                    ^
  符号:   类 Tuple2
  位置: 类 Tokenizer
WordCount.java:15: 错误: 找不到符号
        final ParameterTool params = ParameterTool.fromArgs(args);
              ^
  符号:   类 ParameterTool
  位置: 类 WordCount
WordCount.java:15: 错误: 找不到符号
        final ParameterTool params = ParameterTool.fromArgs(args);
                                     ^
  符号:   变量 ParameterTool
  位置: 类 WordCount
WordCount.java:18: 错误: 找不到符号
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
              ^
  符号:   类 ExecutionEnvironment
  位置: 类 WordCount
WordCount.java:18: 错误: 找不到符号
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                                         ^
  符号:   变量 ExecutionEnvironment
  位置: 类 WordCount
WordCount.java:24: 错误: 找不到符号
        DataSet<String> text;
        ^
  符号:   类 DataSet
  位置: 类 WordCount
WordCount.java:32: 错误: 找不到符号
            text = WordCountData.getDefaultTextLineDataSet(env);
                   ^
  符号:   变量 WordCountData
  位置: 类 WordCount
WordCount.java:35: 错误: 找不到符号
        DataSet<Tuple2<String, Integer>> counts =
        ^
  符号:   类 DataSet
  位置: 类 WordCount
WordCount.java:35: 错误: 找不到符号
        DataSet<Tuple2<String, Integer>> counts =
                ^
  符号:   类 Tuple2
  位置: 类 WordCount
WordCount.java:65: 错误: 方法不会覆盖或实现超类型的方法
        @Override
        ^
WordCount.java:73: 错误: 找不到符号
                    out.collect(new Tuple2<>(token, 1));
                                    ^
  符号:   类 Tuple2
  位置: 类 Tokenizer
21 个错误

我检查了我的外部库,它们都存在那里

这是我的 pom.xml

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.panda</groupId>
    <artifactId>FlinkTest</artifactId>
    <version>1.8.0</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>
    <url>http://www.myorganization.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.8.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>

        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>

        </dependency>

        <!-- Add connector dependencies here. They must be in the default scope (compile). -->

        <!-- Example:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        -->

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.panda.StreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>

                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.0.0,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!-- This profile helps to make things run out of the box in IntelliJ -->
    <!-- Its adds Flink's core classes to the runtime class path. -->
    <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
    <profiles>
        <profile>
            <id>add-dependencies-for-IDEA</id>

            <activation>
                <property>
                    <name>idea.version</name>
                </property>
            </activation>

            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-java</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
            </dependencies>
        </profile>
    </profiles>

</project>

这是我的项目结构-> 库: 在此处输入图像描述

我已经为此工作了几天,尝试了几种方法,但仍然显示错误。有人可以帮我解决吗?提前致谢!

我曾尝试使用 Maven 来构建项目,但它仍然显示如下错误:

[ERROR] /Users/yantong/IdeaProjects/FlinkTest/src/main/java/com/panda/WordCount.java:[8,53] 程序包org.apache.flink.examples.java.wordcount.util不存在
[ERROR] /Users/yantong/IdeaProjects/FlinkTest/src/main/java/com/panda/WordCount.java:[32,20] 找不到符号
  符号:   变量 WordCountData
  位置: 类 com.panda.WordCount
[INFO] 2 errors 

标签: javaapache-flink

解决方案


您看到的问题是javac,Java 编译器在编译类时需要访问所有依赖项。因此WordCount.java指的是在其他库中定义的类和接口,但它找不到其他库,因此您会收到如下错误:

WordCount.java:3: 错误: 程序包org.apache.flink.api.common.functions不存在
import org.apache.flink.api.common.functions.FlatMapFunction;

即第 3 行是WordCount.javajavac无法找到的文件。

假设您已经下载了所有需要的依赖项,javac可以-cp选择将依赖项添加到编译类路径。这样做根本不明智,因为您需要的依赖项数量很大(因为像这样的类org.apache.flink.api.common.functions.FlatMapFunction将需要它们自己的依赖项,等等)。请不要这样做。

正如其中一条评论指出的那样,您应该使用 Maven(或 Gradle)之类的构建工具为您下载所有依赖项,并在编译时根据需要导入它们。要构建项目,请尝试在终端中运行:

cd directory-that-contains-your-project
mvn package

这应该将文件编译并打包到一个 jar 中,然后您可以运行该 jar。

编辑:我可以从您更新的问题中看到您仍然有错误。示例 Flink 代码在这里。在示例WordCount.java中,有一个名为org.apache.flink.examples.java.wordcount.util.WordCountData的类的导入。示例代码项目在项目文件夹下的WordCountData.java子文件夹中调用了一个util文件。请注意包含此类的文件夹的路径是/src/main/java/org/apache/flink/examples/java/wordcount/util/. 此文件夹中的任何类都有包org.apache.flink.examples.java.wordcount.util,即包遵循文件夹命名。

在您的错误消息中,我可以看到:

[ERROR] /Users/yantong/IdeaProjects/FlinkTest/src/main/java/com/panda/WordCount.java:[8,53] 程序包org.apache.flink.examples.java.wordcount.util不存在

请注意您的路径有何不同?你的路径是com/panda/WordCount.java. 这意味着您的WordCount课程在 package 中com.panda。我敢打赌你WordCountDatacom/panda/util/WordCountData.java. 您的导入声明说您想要org.apache.flink.examples.java.wordcount.util.WordCountData,但您的代码定义了com.panda.WordCountData.

您需要将示例代码移动到正确的文件夹 ( src/main/java/org/apache/flink/examples/java/wordcount),或者将其保留在com/panda其中并将导入语句更改为指向您的类,即

import com.panda.util.WordCountData;

推荐阅读