0

My goal is to set up the Flink-SQL-Connector. I want to use SQL capabilities to retrieve Kafka topic messages. Protobuf is being used by Kafaf Topic.

I am getting error "Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue" when I try to execute Flink SQL.

Below is my code.

# Define variables
FLINK_VERSION="1.14.4"  # Change this to the desired Flink version
FLINK_IMAGE="flink:$FLINK_VERSION"
NETWORK_NAME="flink-network"
JOBMANAGER_CONTAINER_NAME="jobmanager"
TASKMANAGER_CONTAINER_NAME="taskmanager"
JOBMANAGER_RPC_ADDRESS="jobmanager"
PARALLELISM=2
KAFKA_BROKER="<<IPAddress:PORT>>"
KAFKA_USERNAME="username"
KAFKA_PASSWORD="password"/
PROTOBUF_CLASS_NAME="test.message"
PROTOBUF_PROTO_FILE="/Users/test/Message.proto"
LOCAL_PROTOBUF_DESCRIPTOR_FILE="Message.desc"
PROTOBUF_DESCRIPTOR_FILE="/opt/flink/Message.desc"

PROTOBUF_JAR_URL="https://repo1.maven.org/maven2/org/apache/flink/flink-
protobuf/1.20.0/flink-protobuf-1.20.0.jar"
PROTOBUF_JAR_NAME="flink-protobuf-1.20.0.jar"

KAFKA_CONNECTOR_JAR_URL="https://repo.maven.apache.org/maven2/org/apache/flink/flink-
connector-kafka_2.11/1.14.4/flink-connector-kafka_2.11-1.14.4.jar"
KAFKA_CONNECTOR_JAR_NAME="flink-connector-kafka_2.11-1.14.4.jar"
CUSTOM_JOBMANAGER_PORT=6130  # Custom port to avoid conflict with port 6123
CUSTOM_WEB_UI_PORT=8082  # Custom port to avoid conflict with port 8081
KAFKA_CONFIG_FILE="kafka_consumer.properties"
SQL_CLIENT_DEFAULTS_FILE="sql-client-defaults.yaml' 

# Pull the Flink image
echo "Pulling Flink image..."
podman pull $FLINK_IMAGE

# Create a Podman network
echo "Creating Podman network..."
podman network create $NETWORK_NAME

# Start the JobManager container with custom port mapping
echo "Starting JobManager container..."
podman run -d --name $JOBMANAGER_CONTAINER_NAME --network $NETWORK_NAME -p 
$CUSTOM_WEB_UI_PORT:8081 -p $CUSTOM_JOBMANAGER_PORT:6123 $FLINK_IMAGE jobmanager

# Verify the JobManager container is running
echo "Verifying JobManager container is running..."
if [ "$(podman inspect -f '{{.State.Running}}' $JOBMANAGER_CONTAINER_NAME)" != "true" 
]; then
  echo "Error: JobManager container is not running."
  podman logs $JOBMANAGER_CONTAINER_NAME
  exit 1
fi

# Start the TaskManager containers
echo "Starting TaskManager containers..."
for i in $(seq 1 $PARALLELISM); do
  podman run -d --name ${TASKMANAGER_CONTAINER_NAME}-$i --network $NETWORK_NAME -e 
JOB_MANAGER_RPC_ADDRESS=$JOBMANAGER_RPC_ADDRESS $FLINK_IMAGE taskmanager
done

# Download Protobuf format JAR
echo "Downloading Protobuf format JAR..."
curl -O $PROTOBUF_JAR_URL

# Download Kafka connector JAR
echo "Downloading Kafka connector JAR..."
curl -O $KAFKA_CONNECTOR_JAR_URL

# Copy Protobuf format JAR to JobManager and TaskManager containers
echo "Copying Protobuf format JAR to JobManager and TaskManager containers..."
podman cp $PROTOBUF_JAR_NAME $JOBMANAGER_CONTAINER_NAME:/opt/flink/lib/
for i in $(seq 1 $PARALLELISM); do
  podman cp $PROTOBUF_JAR_NAME ${TASKMANAGER_CONTAINER_NAME}-$i:/opt/flink/lib/
done

# Copy Kafka connector JAR to JobManager and TaskManager containers
echo "Copying Kafka connector JAR to JobManager and TaskManager containers..."
podman cp $KAFKA_CONNECTOR_JAR_NAME $JOBMANAGER_CONTAINER_NAME:/opt/flink/lib/
for i in $(seq 1 $PARALLELISM); do
  podman cp $KAFKA_CONNECTOR_JAR_NAME ${TASKMANAGER_CONTAINER_NAME}-$i:/opt/flink/lib/
done

# Generate Protobuf descriptor file
echo "Generating Protobuf descriptor file..."
protoc --descriptor_set_out=$LOCAL_PROTOBUF_DESCRIPTOR_FILE --proto_path=$(dirname 
$PROTOBUF_PROTO_FILE) $PROTOBUF_PROTO_FILE

# Copy Protobuf descriptor file to JobManager and TaskManager containers
echo "Copying Protobuf descriptor file to JobManager and TaskManager containers..."
podman cp $LOCAL_PROTOBUF_DESCRIPTOR_FILE 
$JOBMANAGER_CONTAINER_NAME:$PROTOBUF_DESCRIPTOR_FILE
for i in $(seq 1 $PARALLELISM); do
  podman cp $LOCAL_PROTOBUF_DESCRIPTOR_FILE 
${TASKMANAGER_CONTAINER_NAME}-$i:$PROTOBUF_DESCRIPTOR_FILE
done

# Copy sql-client-defaults.yaml to JobManager container
echo "Copying sql-client-defaults.yaml to JobManager container..."
podman cp $SQL_CLIENT_DEFAULTS_FILE $JOBMANAGER_CONTAINER_NAME:/opt/flink/conf/sql-
client-defaults.yaml

# Run SQL Client with Explicit Classpath
echo "Running SQL Client with Explicit Classpath..."
podman exec -it $JOBMANAGER_CONTAINER_NAME /opt/flink/bin/sql-client.sh -l 
/opt/flink/lib/


# Create Kafka table with Protobuf format
echo "Creating Kafka table with Protobuf format..."
FLINK_SQL="
CREATE TABLE kafka_table (
  msg_type INT,
  disc STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'bqt_trd_str_1',
  'properties.bootstrap.servers' = '$KAFKA_BROKER',
  'properties.security.protocol' = 'SASL_PLAINTEXT',
  'properties.sasl.mechanism' = 'SCRAM-SHA-256',
  'properties.sasl.jaas.config' = 
'org.apache.kafka.common.security.scram.ScramLoginModule required 
username=\"$KAFKA_USERNAME\" password=\"$KAFKA_Password\";',
  'format' = 'protobuf',
  'protobuf.message-class-name' = 'test.Message',
  'protobuf.descriptor-file' = 'file://$PROTOBUF_DESCRIPTOR_FILE',
  'scan.startup.mode' = 'earliest-offset'
);

-- Select statement to retrieve all Kafka messages
SELECT * FROM kafka_table;
"

# Execute the SQL commands using the Flink SQL client
echo "$FLINK_SQL" | podman exec -i $JOBMANAGER_CONTAINER_NAME /opt/flink/bin/sql-
client.sh -f -

I verified that above script has copied all files in proper directory. I have ssh'd to the container and verified.

I'm receiving the error below.


Running SQL Client with Explicit Classpath...

Exception in thread "main" `org.apache.flink.table.client.SqlClientException:` Unexpected exception. This is a bug. Please consider filing an issue.
    at `org.apache.flink.table.client.SqlClient.startClient`(SqlClient.java:201)
    at `org.apache.flink.table.client.SqlClient.main`(SqlClient.java:161)
Caused by: java.lang.NoClassDefFoundError: org/apache/`flink`/connector/file/table/factories/BulkReaderFormatFactory
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:473)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:405)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
    at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
    at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:172)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
    at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
    at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
    at java.util.Iterator.forEachRemaining(Iterator.java:116)
    at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:623)
    at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:378)
    at org.apache.flink.table.client.gateway.context.ExecutionContext.lookupExecutor(ExecutionContext.java:156)
    at org.apache.flink.table.client.gateway.context.ExecutionContext.createTableEnvironment(ExecutionContext.java:114)
    at org.apache.flink.table.client.gateway.context.ExecutionContext.<init>(ExecutionContext.java:66)
    at org.apache.flink.table.client.gateway.context.SessionContext.create(SessionContext.java:246)
    at org.apache.flink.table.client.gateway.local.LocalContextUtils.buildSessionContext(LocalContextUtils.java:87)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:87)
    at org.apache.flink.table.client.SqlClient.start(SqlClient.java:88)
    at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)
    ... 1 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory
    at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 34 more

There may be a problem with the protobuf jar file, but I'm not sure. I checked connectivity with Kafka, and everything is operating as it should. There is a problem when I run FLINK_SQL. Anybody has encountered a similar problem.

1 Answer 1

0

You are trying to use a version of flink-protobuf that was compiled against Flink 1.20 with Flink 1.14. That can't work -- you have to use Flink formats and connectors that are compiled for the same minor version of Flink as your Flink cluster.

Moreover, flink-protobuf was first introduced in Flink 1.16, so you'll have to upgrade at least that far if you want to use it.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.