My goal is to set up the Flink-SQL-Connector. I want to use SQL capabilities to retrieve Kafka topic messages. Protobuf is being used by Kafaf Topic.
I am getting error "Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue" when I try to execute Flink SQL.
Below is my code.
# Define variables
FLINK_VERSION="1.14.4" # Change this to the desired Flink version
FLINK_IMAGE="flink:$FLINK_VERSION"
NETWORK_NAME="flink-network"
JOBMANAGER_CONTAINER_NAME="jobmanager"
TASKMANAGER_CONTAINER_NAME="taskmanager"
JOBMANAGER_RPC_ADDRESS="jobmanager"
PARALLELISM=2
KAFKA_BROKER="<<IPAddress:PORT>>"
KAFKA_USERNAME="username"
KAFKA_PASSWORD="password"/
PROTOBUF_CLASS_NAME="test.message"
PROTOBUF_PROTO_FILE="/Users/test/Message.proto"
LOCAL_PROTOBUF_DESCRIPTOR_FILE="Message.desc"
PROTOBUF_DESCRIPTOR_FILE="/opt/flink/Message.desc"
PROTOBUF_JAR_URL="https://repo1.maven.org/maven2/org/apache/flink/flink-
protobuf/1.20.0/flink-protobuf-1.20.0.jar"
PROTOBUF_JAR_NAME="flink-protobuf-1.20.0.jar"
KAFKA_CONNECTOR_JAR_URL="https://repo.maven.apache.org/maven2/org/apache/flink/flink-
connector-kafka_2.11/1.14.4/flink-connector-kafka_2.11-1.14.4.jar"
KAFKA_CONNECTOR_JAR_NAME="flink-connector-kafka_2.11-1.14.4.jar"
CUSTOM_JOBMANAGER_PORT=6130 # Custom port to avoid conflict with port 6123
CUSTOM_WEB_UI_PORT=8082 # Custom port to avoid conflict with port 8081
KAFKA_CONFIG_FILE="kafka_consumer.properties"
SQL_CLIENT_DEFAULTS_FILE="sql-client-defaults.yaml'
# Pull the Flink image
echo "Pulling Flink image..."
podman pull $FLINK_IMAGE
# Create a Podman network
echo "Creating Podman network..."
podman network create $NETWORK_NAME
# Start the JobManager container with custom port mapping
echo "Starting JobManager container..."
podman run -d --name $JOBMANAGER_CONTAINER_NAME --network $NETWORK_NAME -p
$CUSTOM_WEB_UI_PORT:8081 -p $CUSTOM_JOBMANAGER_PORT:6123 $FLINK_IMAGE jobmanager
# Verify the JobManager container is running
echo "Verifying JobManager container is running..."
if [ "$(podman inspect -f '{{.State.Running}}' $JOBMANAGER_CONTAINER_NAME)" != "true"
]; then
echo "Error: JobManager container is not running."
podman logs $JOBMANAGER_CONTAINER_NAME
exit 1
fi
# Start the TaskManager containers
echo "Starting TaskManager containers..."
for i in $(seq 1 $PARALLELISM); do
podman run -d --name ${TASKMANAGER_CONTAINER_NAME}-$i --network $NETWORK_NAME -e
JOB_MANAGER_RPC_ADDRESS=$JOBMANAGER_RPC_ADDRESS $FLINK_IMAGE taskmanager
done
# Download Protobuf format JAR
echo "Downloading Protobuf format JAR..."
curl -O $PROTOBUF_JAR_URL
# Download Kafka connector JAR
echo "Downloading Kafka connector JAR..."
curl -O $KAFKA_CONNECTOR_JAR_URL
# Copy Protobuf format JAR to JobManager and TaskManager containers
echo "Copying Protobuf format JAR to JobManager and TaskManager containers..."
podman cp $PROTOBUF_JAR_NAME $JOBMANAGER_CONTAINER_NAME:/opt/flink/lib/
for i in $(seq 1 $PARALLELISM); do
podman cp $PROTOBUF_JAR_NAME ${TASKMANAGER_CONTAINER_NAME}-$i:/opt/flink/lib/
done
# Copy Kafka connector JAR to JobManager and TaskManager containers
echo "Copying Kafka connector JAR to JobManager and TaskManager containers..."
podman cp $KAFKA_CONNECTOR_JAR_NAME $JOBMANAGER_CONTAINER_NAME:/opt/flink/lib/
for i in $(seq 1 $PARALLELISM); do
podman cp $KAFKA_CONNECTOR_JAR_NAME ${TASKMANAGER_CONTAINER_NAME}-$i:/opt/flink/lib/
done
# Generate Protobuf descriptor file
echo "Generating Protobuf descriptor file..."
protoc --descriptor_set_out=$LOCAL_PROTOBUF_DESCRIPTOR_FILE --proto_path=$(dirname
$PROTOBUF_PROTO_FILE) $PROTOBUF_PROTO_FILE
# Copy Protobuf descriptor file to JobManager and TaskManager containers
echo "Copying Protobuf descriptor file to JobManager and TaskManager containers..."
podman cp $LOCAL_PROTOBUF_DESCRIPTOR_FILE
$JOBMANAGER_CONTAINER_NAME:$PROTOBUF_DESCRIPTOR_FILE
for i in $(seq 1 $PARALLELISM); do
podman cp $LOCAL_PROTOBUF_DESCRIPTOR_FILE
${TASKMANAGER_CONTAINER_NAME}-$i:$PROTOBUF_DESCRIPTOR_FILE
done
# Copy sql-client-defaults.yaml to JobManager container
echo "Copying sql-client-defaults.yaml to JobManager container..."
podman cp $SQL_CLIENT_DEFAULTS_FILE $JOBMANAGER_CONTAINER_NAME:/opt/flink/conf/sql-
client-defaults.yaml
# Run SQL Client with Explicit Classpath
echo "Running SQL Client with Explicit Classpath..."
podman exec -it $JOBMANAGER_CONTAINER_NAME /opt/flink/bin/sql-client.sh -l
/opt/flink/lib/
# Create Kafka table with Protobuf format
echo "Creating Kafka table with Protobuf format..."
FLINK_SQL="
CREATE TABLE kafka_table (
msg_type INT,
disc STRING
) WITH (
'connector' = 'kafka',
'topic' = 'bqt_trd_str_1',
'properties.bootstrap.servers' = '$KAFKA_BROKER',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
'properties.sasl.jaas.config' =
'org.apache.kafka.common.security.scram.ScramLoginModule required
username=\"$KAFKA_USERNAME\" password=\"$KAFKA_Password\";',
'format' = 'protobuf',
'protobuf.message-class-name' = 'test.Message',
'protobuf.descriptor-file' = 'file://$PROTOBUF_DESCRIPTOR_FILE',
'scan.startup.mode' = 'earliest-offset'
);
-- Select statement to retrieve all Kafka messages
SELECT * FROM kafka_table;
"
# Execute the SQL commands using the Flink SQL client
echo "$FLINK_SQL" | podman exec -i $JOBMANAGER_CONTAINER_NAME /opt/flink/bin/sql-
client.sh -f -
I verified that above script has copied all files in proper directory. I have ssh'd to the container and verified.
I'm receiving the error below.
Running SQL Client with Explicit Classpath...
Exception in thread "main" `org.apache.flink.table.client.SqlClientException:` Unexpected exception. This is a bug. Please consider filing an issue.
at `org.apache.flink.table.client.SqlClient.startClient`(SqlClient.java:201)
at `org.apache.flink.table.client.SqlClient.main`(SqlClient.java:161)
Caused by: java.lang.NoClassDefFoundError: org/apache/`flink`/connector/file/table/factories/BulkReaderFormatFactory
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:405)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:623)
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:378)
at org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:156)
at org.apache.flink.table.client.gateway.context.ExecutionContext.createTableEnvironment(ExecutionContext.java:114)
at org.apache.flink.table.client.gateway.context.ExecutionContext.<init>(ExecutionContext.java:66)
at org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:246)
at org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:87)
at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:87)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
... 1 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory
at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 34 more
There may be a problem with the protobuf jar file, but I'm not sure. I checked connectivity with Kafka, and everything is operating as it should. There is a problem when I run FLINK_SQL. Anybody has encountered a similar problem.
You are trying to use a version of flink-protobuf that was compiled against Flink 1.20 with Flink 1.14. That can't work -- you have to use Flink formats and connectors that are compiled for the same minor version of Flink as your Flink cluster.
Moreover, flink-protobuf was first introduced in Flink 1.16, so you'll have to upgrade at least that far if you want to use it.