Search code examples
javaapache-flinkflink-streamingjava-17

java.lang.reflect.InaccessibleObjectException : module java.base does not "opens java.util.concurrent.atomic" to unnamed module


I am writing a apache flink program to run locally and interact with google pubsub.

Dependencies

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.gcp.pubsub.connector.version>3.0.2-1.18</flink.gcp.pubsub.connector.version>
        <flink.version>1.17.0</flink.version>
        <scala.binary.version>2.10</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-gcp-pubsub</artifactId>
            <version>${flink.gcp.pubsub.connector.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

Program

import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PubSubExample {
    private static final Logger LOG = LoggerFactory.getLogger(PubSubExample.class);

    public static void main(String[] args) throws Exception {
        // parse input arguments
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);

        if (parameterTool.getNumberOfParameters() < 3) {
            System.out.println(
                    "Missing parameters!\n"
                            + "Usage: flink run PubSub.jar --input-subscription <subscription> --input-topicName <topic> --output-topicName <output-topic> "
                            + "--google-project <google project name> ");
            return;
        }

        String projectName = parameterTool.getRequired("google-project");
        String inputTopicName = parameterTool.getRequired("input-topicName");
        String subscriptionName = parameterTool.getRequired("input-subscription");
        String outputTopicName = parameterTool.getRequired("output-topicName");

        PubSubPublisher pubSubPublisher = new PubSubPublisher(projectName, inputTopicName);
        pubSubPublisher.publish(2);

        runFlinkJob(projectName, subscriptionName, outputTopicName);
    }

    private static void runFlinkJob(
            String projectName, String subscriptionName, String outputTopicName) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000L);

        env.addSource(
                        PubSubSource.newBuilder()
                                .withDeserializationSchema(new IntegerSerializer())
                                .withProjectName(projectName)
                                .withSubscriptionName(subscriptionName)
                                .withMessageRateLimit(1)
                                .build())
                .map(PubSubExample::printAndReturn)
                .disableChaining()
                .addSink(
                        PubSubSink.newBuilder()
                                .withSerializationSchema(new IntegerSerializer())
                                .withProjectName(projectName)
                                .withTopicName(outputTopicName)
                                .build());

        env.execute("Flink Streaming PubSubReader");
    }

    private static Integer printAndReturn(Integer i) {
        LOG.info("Processed message with payload: " + i);
        return i;
    }
}


import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;

import java.math.BigInteger;
class PubSubPublisher {
    private final String projectName;
    private final String topicName;

    PubSubPublisher(String projectName, String topicName) {
        this.projectName = projectName;
        this.topicName = topicName;
    }

    /**
     * Publish messages with as payload a single integer. The integers inside the messages start
     * from 0 and increase by one for each message send.
     *
     * @param amountOfMessages amount of messages to send
     */
    void publish(int amountOfMessages) {
        Publisher publisher = null;
        try {
            publisher = Publisher.newBuilder(TopicName.of(projectName, topicName)).build();
            for (int i = 0; i < amountOfMessages; i++) {
                ByteString messageData = ByteString.copyFrom(BigInteger.valueOf(i).toByteArray());
                PubsubMessage message = PubsubMessage.newBuilder().setData(messageData).build();
                publisher.publish(message).get();

                System.out.println("Published message: " + i);
                Thread.sleep(100L);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            try {
                if (publisher != null) {
                    publisher.shutdown();
                }
            } catch (Exception e) {
            }
        }
    }
}


import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;

import com.google.pubsub.v1.PubsubMessage;

import java.io.IOException;
import java.math.BigInteger;

/**
 * Deserialization schema to deserialize messages produced by {@link PubSubPublisher}. The byte[]
 * received by this schema must contain a single Integer.
 */
class IntegerSerializer
        implements PubSubDeserializationSchema<Integer>, SerializationSchema<Integer> {

    @Override
    public Integer deserialize(PubsubMessage message) throws IOException {
        return new BigInteger(message.getData().toByteArray()).intValue();
    }

    @Override
    public boolean isEndOfStream(Integer integer) {
        return false;
    }

    @Override
    public TypeInformation<Integer> getProducedType() {
        return TypeInformation.of(Integer.class);
    }

    @Override
    public byte[] serialize(Integer integer) {
        return BigInteger.valueOf(integer).toByteArray();
    }
}

Error

However when running this program locally gives me following error

Exception in thread "main" java.lang.reflect.InaccessibleObjectException: Unable to make field private static final long java.util.concurrent.atomic.AtomicReference.serialVersionUID accessible: module java.base does not "opens java.util.concurrent.atomic" to unnamed module @4b5a5ed1
    at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:354)
    at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:297)
    at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:178)
    at java.base/java.lang.reflect.Field.setAccessible(Field.java:172)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:106)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132)
    at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2317)
    at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:202)
    at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1244)
    at example.data.PubSubExample.runFlinkJob(PubSubExample.java:50)
    at example.data.PubSubExample.main(PubSubExample.java:33)

Attempt 1

I have added below options in VM options but still error happens.

--add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED

Attempt 2

I tried to degrade java version to 11 as well but that didn't help.

Can someone please help ?

Thanks in advance.


Solution

  • After attempting to replicate the issue locally, you might consider resolving it by adding the following parameters in the JVM

    --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED