Search code examples
javaapache-flink

MacOS Flink-1.8.0 WordCount.java Issue


I'm a new learner for Flink.

I downloaded flink-1.8.0-bin-scala_2.11.tgz from official link and installed apache-maven-3.6.1-bin.tar.gz.

I have already run Flink on my Mac successfully with the command line

./bin/start-cluster.sh

I uploaded the flink-1.8.0/examples/batch/WordCount.jar and run successfully.

I create a project in IntelliJ IDEA to learn writing codes with Flink.

The WordCount.java is below:

package com.panda;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;

public class WordCount {
    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataSet<String> text;
        if (params.has("input")) {
            // read the text file from given input path
            text = env.readTextFile(params.get("input"));
        } else {
            // get default test text data
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            text = WordCountData.getDefaultTextLineDataSet(env);
        }

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                    // group by the tuple field "0" and sum up tuple field "1"
                    .groupBy(0)
                    .sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsCsv(params.get("output"), "\n", " ");
            // execute program
            env.execute("WordCount Example");
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }

    }

// *************************************************************************
//     USER FUNCTIONS
// *************************************************************************

/**
 * Implements the string tokenizer that splits sentences into words as a user-defined
 * FlatMapFunction. The function takes a line (String) and splits it into
 * multiple pairs in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
 */
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
       public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }
}

I right click "run WordCount.java" and it runs successfully and shows the results.

However, when I use Terminal and enter the package and hit

javac WordCount.java

It shows several errors like that:

WordCount.java:3: 错误: 程序包org.apache.flink.api.common.functions不存在
import org.apache.flink.api.common.functions.FlatMapFunction;
                                            ^
WordCount.java:4: 错误: 程序包org.apache.flink.api.java不存在
import org.apache.flink.api.java.DataSet;
                                ^
WordCount.java:5: 错误: 程序包org.apache.flink.api.java不存在
import org.apache.flink.api.java.ExecutionEnvironment;
                                ^
WordCount.java:6: 错误: 程序包org.apache.flink.api.java.tuple不存在
import org.apache.flink.api.java.tuple.Tuple2;
                                      ^
WordCount.java:7: 错误: 程序包org.apache.flink.api.java.utils不存在
import org.apache.flink.api.java.utils.ParameterTool;
                                      ^
WordCount.java:8: 错误: 程序包org.apache.flink.examples.java.wordcount.util不存在(does not exist)
import org.apache.flink.examples.java.wordcount.util.WordCountData;
                                                    ^
WordCount.java:9: 错误: 程序包org.apache.flink.util不存在(does not exist)
import org.apache.flink.util.Collector;
                            ^
WordCount.java:63: 错误: 找不到符号
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
                                                   ^
  符号:   类 FlatMapFunction
  位置: 类 WordCount
WordCount.java:63: 错误: 找不到符号
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
                                                                           ^
  符号:   类 Tuple2
  位置: 类 WordCount
WordCount.java:66: 错误: 找不到符号
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
                                          ^
  符号:   类 Collector
  位置: 类 Tokenizer
WordCount.java:66: 错误: 找不到符号
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
                                                    ^
  符号:   类 Tuple2
  位置: 类 Tokenizer
WordCount.java:15: 错误: 找不到符号
        final ParameterTool params = ParameterTool.fromArgs(args);
              ^
  符号:   类 ParameterTool
  位置: 类 WordCount
WordCount.java:15: 错误: 找不到符号
        final ParameterTool params = ParameterTool.fromArgs(args);
                                     ^
  符号:   变量 ParameterTool
  位置: 类 WordCount
WordCount.java:18: 错误: 找不到符号
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
              ^
  符号:   类 ExecutionEnvironment
  位置: 类 WordCount
WordCount.java:18: 错误: 找不到符号
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                                         ^
  符号:   变量 ExecutionEnvironment
  位置: 类 WordCount
WordCount.java:24: 错误: 找不到符号
        DataSet<String> text;
        ^
  符号:   类 DataSet
  位置: 类 WordCount
WordCount.java:32: 错误: 找不到符号
            text = WordCountData.getDefaultTextLineDataSet(env);
                   ^
  符号:   变量 WordCountData
  位置: 类 WordCount
WordCount.java:35: 错误: 找不到符号
        DataSet<Tuple2<String, Integer>> counts =
        ^
  符号:   类 DataSet
  位置: 类 WordCount
WordCount.java:35: 错误: 找不到符号
        DataSet<Tuple2<String, Integer>> counts =
                ^
  符号:   类 Tuple2
  位置: 类 WordCount
WordCount.java:65: 错误: 方法不会覆盖或实现超类型的方法
        @Override
        ^
WordCount.java:73: 错误: 找不到符号
                    out.collect(new Tuple2<>(token, 1));
                                    ^
  符号:   类 Tuple2
  位置: 类 Tokenizer
21 个错误

I have checked my External Libraries and they all exist there

Here is my pom.xml

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.panda</groupId>
    <artifactId>FlinkTest</artifactId>
    <version>1.8.0</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>
    <url>http://www.myorganization.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.8.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>

        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>

        </dependency>

        <!-- Add connector dependencies here. They must be in the default scope (compile). -->

        <!-- Example:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        -->

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.panda.StreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>

                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.0.0,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!-- This profile helps to make things run out of the box in IntelliJ -->
    <!-- Its adds Flink's core classes to the runtime class path. -->
    <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
    <profiles>
        <profile>
            <id>add-dependencies-for-IDEA</id>

            <activation>
                <property>
                    <name>idea.version</name>
                </property>
            </activation>

            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-java</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
            </dependencies>
        </profile>
    </profiles>

</project>

Here is my Project Structure -> Libraries: enter image description here

I have been working for it for several days, tried several ways, but it still shows error. Could somebody help me fix it? Thanks in advance!

I have tried to use Maven to build the projects, but it still shows error as below:

[ERROR] /Users/yantong/IdeaProjects/FlinkTest/src/main/java/com/panda/WordCount.java:[8,53] 程序包org.apache.flink.examples.java.wordcount.util不存在
[ERROR] /Users/yantong/IdeaProjects/FlinkTest/src/main/java/com/panda/WordCount.java:[32,20] 找不到符号
  符号:   变量 WordCountData
  位置: 类 com.panda.WordCount
[INFO] 2 errors 

Solution

  • The issue you're seeing is that javac, the Java compiler, needs to have access to all dependencies when you compile your class. Thus WordCount.java refers to classes and interfaces that are defined in other libraries, but it can't find the other libraries, hence you get errors like:

    WordCount.java:3: 错误: 程序包org.apache.flink.api.common.functions不存在
    import org.apache.flink.api.common.functions.FlatMapFunction;
    

    i.e. line 3 of WordCount.java refers to a file that javac cannot find.

    Assuming you had downloaded all needed dependencies, javac has a -cp option to add the dependencies to the compilation classpath. This is not at all sensible to do, as the number of dependencies you require is huge (because classes like org.apache.flink.api.common.functions.FlatMapFunction will require their own dependencies, and so on). Please don't do this.

    As one of the comments points out, you should use a build tool like Maven (or Gradle) to download all the dependencies for you and import them as needed when compiling. To build the project, try running in a terminal:

    cd directory-that-contains-your-project
    mvn package
    

    This should compile and package the file into a jar, which you can then run.

    EDIT: I can see from your updated question that you still have an error. The example Flink code is here. In the example WordCount.java, there is an import for a class called org.apache.flink.examples.java.wordcount.util.WordCountData. The example code project has a file called WordCountData.java in a subfolder called util which is underneath the project folder. Notice how the path for the folder containing this class is /src/main/java/org/apache/flink/examples/java/wordcount/util/. Any classes within this folder have the package org.apache.flink.examples.java.wordcount.util, i.e. the package follows the folder naming.

    In your error message, I can see:

    [ERROR] /Users/yantong/IdeaProjects/FlinkTest/src/main/java/com/panda/WordCount.java:[8,53] 程序包org.apache.flink.examples.java.wordcount.util不存在
    

    Notice how your path is different? Your path is com/panda/WordCount.java. That means that your WordCount class is in the package com.panda. I'm willing to bet that you have your WordCountData class in com/panda/util/WordCountData.java. Your import statement says you want org.apache.flink.examples.java.wordcount.util.WordCountData but your code defines com.panda.WordCountData.

    You need to either move the example code to the correct folder (src/main/java/org/apache/flink/examples/java/wordcount), or keep it within com/panda and change the import statements to point at your classes, i.e.

    import com.panda.util.WordCountData;