4

Im new to pyflink. Im tryig to write a python program to read data from kafka topic and prints data to stdout. I followed the link Flink Python Datastream API Kafka Producer Sink Serializaion. But i keep seeing NoSuchMethodError due to version mismatch. I have added the flink-sql-kafka-connector available at https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.13.0/flink-sql-connector-kafka_2.11-1.13.0.jar. Can someone help me in with a proper example to do this? Following is my code

import json
import os

from pyflink.common import SimpleStringSchema
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types


def my_map(obj):
    json_obj = json.loads(json.loads(obj))
    return json.dumps(json_obj["name"])


def kafkaread():
    env = StreamExecutionEnvironment.get_execution_environment()

    env.add_jars("file:///automation/flink/flink-sql-connector-kafka_2.11-1.10.1.jar")

    deserialization_schema = SimpleStringSchema()

    kafkaSource = FlinkKafkaConsumer(
        topics='test',
        deserialization_schema=deserialization_schema,
        properties={'bootstrap.servers': '10.234.175.22:9092', 'group.id': 'test'}
    )

    ds = env.add_source(kafkaSource).print()
    env.execute('kafkaread')


if __name__ == '__main__':
    kafkaread()

But python doesnt recognise the jar file and throws the following error.

Traceback (most recent call last):
  File "flinkKafka.py", line 31, in <module>
    kafkaread()
  File "flinkKafka.py", line 20, in kafkaread
    kafkaSource = FlinkKafkaConsumer(
  File "/automation/flink/venv/lib/python3.8/site-packages/pyflink/datastream/connectors.py", line 186, in __init__
    j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema,
  File "/automation/flink/venv/lib/python3.8/site-packages/pyflink/datastream/connectors.py", line 336, in _get_kafka_consumer
    j_flink_kafka_consumer = j_consumer_clz(topics,
  File "/automation/flink/venv/lib/python3.8/site-packages/pyflink/util/exceptions.py", line 185, in wrapped_call
    raise TypeError(
TypeError: Could not found the Java class 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'. The Java dependencies could be specified via command line argument '--jarfile' or the config option 'pipeline.jars'
           

What is the correct location to add the jar file?

1
  • Are you using a maven build? or any other build? Commented Feb 8, 2022 at 12:04

3 Answers 3

1

I see that you downloaded flink-sql-connector-kafka_2.11-1.13.0.jar, but the code loades flink-sql-connector-kafka_2.11-1.10.1.jar.

May be you can have a check

Sign up to request clarification or add additional context in comments.

Comments

0

just need to check the path to flink-sql-connector jar

Comments

0

You should add jar file of flink-sql-connector-kafka, it depends on your pyflink and scala version. If versions are true, check your path in add_jars function if the jar package is here.

1 Comment

Your answer could be improved with additional supporting information. Please edit to add further details, such as citations or documentation, so that others can confirm that your answer is correct. You can find more information on how to write good answers in the help center.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.