Search code examples
pythonjavaapache-kafkaapache-flinkflink-streaming

TypeError: Could not found the Java class 'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer'


I am using PyFlink. I thought all the Java dependencies are installed along with pip install apache-flink

The above error occurs on this line:

kafka_consumer = FlinkKafkaConsumer(
    topics='mytopic',
    deserialization_schema=deserialization_schema,
    properties={
        'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS})

As per the error do I need to specify the kafka consumer dependency manually in the pyflink environment by downloading and adding the jar?

Any guidance would be appreciated.

Python Version: 3.8.2
Java Version: java 11.0.11

Solution

  • Since Flink is a Java/Scala-based project, for both connectors and formats, implementations are available as jars

    FlinkKafkaConsumer in pyflink relies on Java's FlinkKafkaConsumer implementation

    You need to download the kafka connector jar package to the lib directory of pyflink. Generally, the path of the lib directory is: /usr/local/lib/python3.8.2/site-packages/pyflink/lib

    class FlinkKafkaConsumer(FlinkKafkaConsumerBase):
        """
        The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
        Apache Kafka. The consumer can run in multiple parallel instances, each of which will
        pull data from one or more Kafka partitions.
    
        The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
        during a failure, and that the computation processes elements 'exactly once. (These guarantees
        naturally assume that Kafka itself does not lose any data.)
    
        Please note that Flink snapshots the offsets internally as part of its distributed checkpoints.
        The offsets committed to Kafka / Zookeeper are only to bring the outside view of progress in
        sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of
        how far the Flink Kafka consumer has consumed a topic.
    
        Please refer to Kafka's documentation for the available configuration properties:
        http://kafka.apache.org/documentation.html#newconsumerconfigs
        """
    
        def __init__(self, topics: Union[str, List[str]], deserialization_schema: DeserializationSchema,
                     properties: Dict):
            """
            Creates a new Kafka streaming source consumer for Kafka 0.10.x.
    
            This constructor allows passing multiple topics to the consumer.
    
            :param topics: The Kafka topics to read from.
            :param deserialization_schema: The de-/serializer used to convert between Kafka's byte
                                           messages and Flink's objects.
            :param properties: The properties that are used to configure both the fetcher and the offset
                               handler.
            """
    
            JFlinkKafkaConsumer = get_gateway().jvm
                .org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
            j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema,
                                                         JFlinkKafkaConsumer)
            super(FlinkKafkaConsumer, self).__init__(j_flink_kafka_consumer=j_flink_kafka_consumer)