Search code examples
scalaapache-flink

how to run first example of Apache Flink


I am trying to run the first example from the oreilly book "Stream Processing with Apache Flink" and from the flink project. Each gives different errors

Example from the book gies NoClassDefFound error Example from flink project gives java.net.ConnectException: Connection refused (Connection refused) but does create a flink job, see screenshot.

Detail below

Book example

java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError:scala/runtime/java8/JFunction1$mcVI$sp at io.github.streamingwithflink.chapter1.AverageSensorReadings$$anon$3.createSerializer(AverageSensorReadings.scala:50)

The instructions from the book are:

  1. download flink-1.7.1-bin-scala_2.12.tgz

  2. extract

  3. start cluster ./bin/start-cluster.sh
  4. open flink's web UI http://localhost:8081

this all works fine

  1. Download the jar file that includes examples in this book
  2. run example
./bin/flink run \
  -c io.github.streamingwithflink.chapter1.AverageSensorReadings \
  examples-scala.jar

It seems that the class is not found from error message at the top of this post.

I put the jar in the same directory I am running the command

java -version
openjdk version "1.8.0_242"
OpenJDK Runtime Environment (Zulu 8.44.0.9-CA-macosx) (build 1.8.0_242-b20)
OpenJDK 64-Bit Server VM (Zulu 8.44.0.9-CA-macosx) (build 25.242-b20, mixed mode)

I also tried compiling the jar myself with the same error.

https://github.com/streaming-with-flink/examples-scala.git

and

mvn clean build

error is the same.

Flink project tutorial

running the SocketWindowWordCount

./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

I get a job but it fails

gives java.net.ConnectException: Connection refused (Connection refused)

enter image description here It is not clear to me what connection is refused. I tried different ports with no change.

How can I run flink code successfully?


Solution

  • I tried to reproduce the failing AverageSensorReadings example, but it was working on my setup. I'll try look deeper into it tomorrow.

    Regarding the SocketWindowWordCount example, the error message indicates that the Flink job failed to open a connection to the socket on port 9000. You need to open the socket before you start the job. You can do this for example with netcat:

    nc -l 9000
    

    After the job is running, you can send messages by typing and and these message will be ingested into the Flink job. You can see the stats in the WebUI evolving according to the number of words that your messages consisted of.

    Note that netcat closes the socket when you stop the Flink job.