I am writing a apache flink program to run locally and interact with google pubsub.
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.gcp.pubsub.connector.version>3.0.2-1.18</flink.gcp.pubsub.connector.version>
<flink.version>1.17.0</flink.version>
<scala.binary.version>2.10</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-gcp-pubsub</artifactId>
<version>${flink.gcp.pubsub.connector.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PubSubExample {
private static final Logger LOG = LoggerFactory.getLogger(PubSubExample.class);
public static void main(String[] args) throws Exception {
// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
if (parameterTool.getNumberOfParameters() < 3) {
System.out.println(
"Missing parameters!\n"
+ "Usage: flink run PubSub.jar --input-subscription <subscription> --input-topicName <topic> --output-topicName <output-topic> "
+ "--google-project <google project name> ");
return;
}
String projectName = parameterTool.getRequired("google-project");
String inputTopicName = parameterTool.getRequired("input-topicName");
String subscriptionName = parameterTool.getRequired("input-subscription");
String outputTopicName = parameterTool.getRequired("output-topicName");
PubSubPublisher pubSubPublisher = new PubSubPublisher(projectName, inputTopicName);
pubSubPublisher.publish(2);
runFlinkJob(projectName, subscriptionName, outputTopicName);
}
private static void runFlinkJob(
String projectName, String subscriptionName, String outputTopicName) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000L);
env.addSource(
PubSubSource.newBuilder()
.withDeserializationSchema(new IntegerSerializer())
.withProjectName(projectName)
.withSubscriptionName(subscriptionName)
.withMessageRateLimit(1)
.build())
.map(PubSubExample::printAndReturn)
.disableChaining()
.addSink(
PubSubSink.newBuilder()
.withSerializationSchema(new IntegerSerializer())
.withProjectName(projectName)
.withTopicName(outputTopicName)
.build());
env.execute("Flink Streaming PubSubReader");
}
private static Integer printAndReturn(Integer i) {
LOG.info("Processed message with payload: " + i);
return i;
}
}
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.math.BigInteger;
class PubSubPublisher {
private final String projectName;
private final String topicName;
PubSubPublisher(String projectName, String topicName) {
this.projectName = projectName;
this.topicName = topicName;
}
/**
* Publish messages with as payload a single integer. The integers inside the messages start
* from 0 and increase by one for each message send.
*
* @param amountOfMessages amount of messages to send
*/
void publish(int amountOfMessages) {
Publisher publisher = null;
try {
publisher = Publisher.newBuilder(TopicName.of(projectName, topicName)).build();
for (int i = 0; i < amountOfMessages; i++) {
ByteString messageData = ByteString.copyFrom(BigInteger.valueOf(i).toByteArray());
PubsubMessage message = PubsubMessage.newBuilder().setData(messageData).build();
publisher.publish(message).get();
System.out.println("Published message: " + i);
Thread.sleep(100L);
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
try {
if (publisher != null) {
publisher.shutdown();
}
} catch (Exception e) {
}
}
}
}
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.math.BigInteger;
/**
* Deserialization schema to deserialize messages produced by {@link PubSubPublisher}. The byte[]
* received by this schema must contain a single Integer.
*/
class IntegerSerializer
implements PubSubDeserializationSchema<Integer>, SerializationSchema<Integer> {
@Override
public Integer deserialize(PubsubMessage message) throws IOException {
return new BigInteger(message.getData().toByteArray()).intValue();
}
@Override
public boolean isEndOfStream(Integer integer) {
return false;
}
@Override
public TypeInformation<Integer> getProducedType() {
return TypeInformation.of(Integer.class);
}
@Override
public byte[] serialize(Integer integer) {
return BigInteger.valueOf(integer).toByteArray();
}
}
However when running this program locally gives me following error
Exception in thread "main" java.lang.reflect.InaccessibleObjectException: Unable to make field private static final long java.util.concurrent.atomic.AtomicReference.serialVersionUID accessible: module java.base does not "opens java.util.concurrent.atomic" to unnamed module @4b5a5ed1
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
at java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:106)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2317)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:202)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1244)
at example.data.PubSubExample.runFlinkJob(PubSubExample.java:50)
at example.data.PubSubExample.main(PubSubExample.java:33)
I have added below options in VM options but still error happens.
--add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED
I tried to degrade java version to 11 as well but that didn't help.
Can someone please help ?
Thanks in advance.
After attempting to replicate the issue locally, you might consider resolving it by adding the following parameters in the JVM:
--add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED