Search code examples
apache-flinkflink-sqlprotobuf-java

Flink SQL Error: Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception


My goal is to set up the Flink-SQL-Connector. I want to use SQL capabilities to retrieve Kafka topic messages. Protobuf is being used by Kafaf Topic.

I am getting error "Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue" when I try to execute Flink SQL.

Below is my code.

# Define variables
FLINK_VERSION="1.14.4"  # Change this to the desired Flink version
FLINK_IMAGE="flink:$FLINK_VERSION"
NETWORK_NAME="flink-network"
JOBMANAGER_CONTAINER_NAME="jobmanager"
TASKMANAGER_CONTAINER_NAME="taskmanager"
JOBMANAGER_RPC_ADDRESS="jobmanager"
PARALLELISM=2
KAFKA_BROKER="<<IPAddress:PORT>>"
KAFKA_USERNAME="username"
KAFKA_PASSWORD="password"/
PROTOBUF_CLASS_NAME="test.message"
PROTOBUF_PROTO_FILE="/Users/test/Message.proto"
LOCAL_PROTOBUF_DESCRIPTOR_FILE="Message.desc"
PROTOBUF_DESCRIPTOR_FILE="/opt/flink/Message.desc"

PROTOBUF_JAR_URL="https://repo1.maven.org/maven2/org/apache/flink/flink-
protobuf/1.20.0/flink-protobuf-1.20.0.jar"
PROTOBUF_JAR_NAME="flink-protobuf-1.20.0.jar"

KAFKA_CONNECTOR_JAR_URL="https://repo.maven.apache.org/maven2/org/apache/flink/flink-
connector-kafka_2.11/1.14.4/flink-connector-kafka_2.11-1.14.4.jar"
KAFKA_CONNECTOR_JAR_NAME="flink-connector-kafka_2.11-1.14.4.jar"
CUSTOM_JOBMANAGER_PORT=6130  # Custom port to avoid conflict with port 6123
CUSTOM_WEB_UI_PORT=8082  # Custom port to avoid conflict with port 8081
KAFKA_CONFIG_FILE="kafka_consumer.properties"
SQL_CLIENT_DEFAULTS_FILE="sql-client-defaults.yaml' 

# Pull the Flink image
echo "Pulling Flink image..."
podman pull $FLINK_IMAGE

# Create a Podman network
echo "Creating Podman network..."
podman network create $NETWORK_NAME

# Start the JobManager container with custom port mapping
echo "Starting JobManager container..."
podman run -d --name $JOBMANAGER_CONTAINER_NAME --network $NETWORK_NAME -p 
$CUSTOM_WEB_UI_PORT:8081 -p $CUSTOM_JOBMANAGER_PORT:6123 $FLINK_IMAGE jobmanager

# Verify the JobManager container is running
echo "Verifying JobManager container is running..."
if [ "$(podman inspect -f '{{.State.Running}}' $JOBMANAGER_CONTAINER_NAME)" != "true" 
]; then
  echo "Error: JobManager container is not running."
  podman logs $JOBMANAGER_CONTAINER_NAME
  exit 1
fi

# Start the TaskManager containers
echo "Starting TaskManager containers..."
for i in $(seq 1 $PARALLELISM); do
  podman run -d --name ${TASKMANAGER_CONTAINER_NAME}-$i --network $NETWORK_NAME -e 
JOB_MANAGER_RPC_ADDRESS=$JOBMANAGER_RPC_ADDRESS $FLINK_IMAGE taskmanager
done

# Download Protobuf format JAR
echo "Downloading Protobuf format JAR..."
curl -O $PROTOBUF_JAR_URL

# Download Kafka connector JAR
echo "Downloading Kafka connector JAR..."
curl -O $KAFKA_CONNECTOR_JAR_URL

# Copy Protobuf format JAR to JobManager and TaskManager containers
echo "Copying Protobuf format JAR to JobManager and TaskManager containers..."
podman cp $PROTOBUF_JAR_NAME $JOBMANAGER_CONTAINER_NAME:/opt/flink/lib/
for i in $(seq 1 $PARALLELISM); do
  podman cp $PROTOBUF_JAR_NAME ${TASKMANAGER_CONTAINER_NAME}-$i:/opt/flink/lib/
done

# Copy Kafka connector JAR to JobManager and TaskManager containers
echo "Copying Kafka connector JAR to JobManager and TaskManager containers..."
podman cp $KAFKA_CONNECTOR_JAR_NAME $JOBMANAGER_CONTAINER_NAME:/opt/flink/lib/
for i in $(seq 1 $PARALLELISM); do
  podman cp $KAFKA_CONNECTOR_JAR_NAME ${TASKMANAGER_CONTAINER_NAME}-$i:/opt/flink/lib/
done

# Generate Protobuf descriptor file
echo "Generating Protobuf descriptor file..."
protoc --descriptor_set_out=$LOCAL_PROTOBUF_DESCRIPTOR_FILE --proto_path=$(dirname 
$PROTOBUF_PROTO_FILE) $PROTOBUF_PROTO_FILE

# Copy Protobuf descriptor file to JobManager and TaskManager containers
echo "Copying Protobuf descriptor file to JobManager and TaskManager containers..."
podman cp $LOCAL_PROTOBUF_DESCRIPTOR_FILE 
$JOBMANAGER_CONTAINER_NAME:$PROTOBUF_DESCRIPTOR_FILE
for i in $(seq 1 $PARALLELISM); do
  podman cp $LOCAL_PROTOBUF_DESCRIPTOR_FILE 
${TASKMANAGER_CONTAINER_NAME}-$i:$PROTOBUF_DESCRIPTOR_FILE
done

# Copy sql-client-defaults.yaml to JobManager container
echo "Copying sql-client-defaults.yaml to JobManager container..."
podman cp $SQL_CLIENT_DEFAULTS_FILE $JOBMANAGER_CONTAINER_NAME:/opt/flink/conf/sql-
client-defaults.yaml

# Run SQL Client with Explicit Classpath
echo "Running SQL Client with Explicit Classpath..."
podman exec -it $JOBMANAGER_CONTAINER_NAME /opt/flink/bin/sql-client.sh -l 
/opt/flink/lib/


# Create Kafka table with Protobuf format
echo "Creating Kafka table with Protobuf format..."
FLINK_SQL="
CREATE TABLE kafka_table (
  msg_type INT,
  disc STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'bqt_trd_str_1',
  'properties.bootstrap.servers' = '$KAFKA_BROKER',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  'properties.sasl.jaas.config' = 
'org.apache.kafka.common.security.scram.ScramLoginModule required 
username=\"$KAFKA_USERNAME\" password=\"$KAFKA_Password\";',
  'format' = 'protobuf',
  'protobuf.message-class-name' = 'test.Message',
  'protobuf.descriptor-file' = 'file://$PROTOBUF_DESCRIPTOR_FILE',
  'scan.startup.mode' = 'earliest-offset'
);

-- Select statement to retrieve all Kafka messages
SELECT * FROM kafka_table;
"

# Execute the SQL commands using the Flink SQL client
echo "$FLINK_SQL" | podman exec -i $JOBMANAGER_CONTAINER_NAME /opt/flink/bin/sql-
client.sh -f -

I verified that above script has copied all files in proper directory. I have ssh'd to the container and verified.

I'm receiving the error below.


Running SQL Client with Explicit Classpath...

Exception in thread "main" `org.apache.flink.table.client.SqlClientException:` Unexpected exception. This is a bug. Please consider filing an issue.
    at `org.apache.flink.table.client.SqlClient.startClient`(SqlClient.java:201)
    at `org.apache.flink.table.client.SqlClient.main`(SqlClient.java:161)
Caused by: java.lang.NoClassDefFoundError: org/apache/`flink`/connector/file/table/factories/BulkReaderFormatFactory
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:405)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
    at java.util.Iterator.forEachRemaining(Iterator.java:116)
    at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:623)
    at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:378)
    at org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:156)
    at org.apache.flink.table.client.gateway.context.ExecutionContext.createTableEnvironment(ExecutionContext.java:114)
    at org.apache.flink.table.client.gateway.context.ExecutionContext.<init>(ExecutionContext.java:66)
    at org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:246)
    at org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:87)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:87)
    at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
    at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
    ... 1 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 34 more

There may be a problem with the protobuf jar file, but I'm not sure. I checked connectivity with Kafka, and everything is operating as it should. There is a problem when I run FLINK_SQL. Anybody has encountered a similar problem.


Solution

  • You are trying to use a version of flink-protobuf that was compiled against Flink 1.20 with Flink 1.14. That can't work -- you have to use Flink formats and connectors that are compiled for the same minor version of Flink as your Flink cluster.

    Moreover, flink-protobuf was first introduced in Flink 1.16, so you'll have to upgrade at least that far if you want to use it.