I have written the flink code to read the from pubsub. While executing the code with the command flink run Flink.jar I am getting the below mentioned error. I am using the flink version 1.9.3
Starting execution of program
------------------------------------------------------------
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No operators defined in streaming topology. Cannot execute.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:621)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:466)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1008)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1081)
at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1081)
Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1545)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1540)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
at org.flink.ReadFromPubsub.main(ReadFromPubsub.java:30)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:604)
... 9 more
Please find the code which I am using
package org.flink;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
public class ReadFromPubsub
{
public static void main(String args[]) throws Exception
{
System.out.println("Flink Pubsub Code Read 1");
StreamExecutionEnvironment streamExecEnv= StreamExecutionEnvironment.getExecutionEnvironment();
DeserializationSchema<String> deserializer = new SimpleStringSchema();
SourceFunction<String> pubsubSource = PubSubSource.newBuilder() .withDeserializationSchema(deserializer)
.withProjectName("vz-it-np-gudv-dev-vzntdo-0") .withSubscriptionName("subscription1").build();
streamExecEnv.addSource(pubsubSource);
streamExecEnv.execute();
}
}
I am trying to read the data from pubsub with flink code but not able to do so.
Flink uses lazy evaluation and since you haven't specified any sinks there would be no reason to execute this job.
From the Flink docs:
All Flink programs are executed lazily: When the program’s main method is executed, the data loading and transformations do not happen directly. Rather, each operation is created and added to a dataflow graph. The operations are actually executed when the execution is explicitly triggered by an execute() call on the execution environment.
However, your dataflow graph has no output in this case which makes processing unnecessary.
For debugging purposes, you can add a print sink to your source to make your example work:
streamExecEnv.addSource(pubsubSource).print();