I am using PyFlink. I thought all the Java dependencies are installed along with pip install apache-flink
The above error occurs on this line:
kafka_consumer = FlinkKafkaConsumer(
topics='mytopic',
deserialization_schema=deserialization_schema,
properties={
'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS})
As per the error do I need to specify the kafka consumer dependency manually in the pyflink environment by downloading and adding the jar?
Any guidance would be appreciated.
Python Version: 3.8.2
Java Version: java 11.0.11
Since Flink is a Java/Scala-based project, for both connectors and formats, implementations are available as jars
FlinkKafkaConsumer in pyflink relies on Java's FlinkKafkaConsumer implementation
You need to download the kafka connector jar package to the lib directory of pyflink. Generally, the path of the lib directory is: /usr/local/lib/python3.8.2/site-packages/pyflink/lib
class FlinkKafkaConsumer(FlinkKafkaConsumerBase):
"""
The Flink Kafka Consumer is a streaming data source that pulls a parallel data stream from
Apache Kafka. The consumer can run in multiple parallel instances, each of which will
pull data from one or more Kafka partitions.
The Flink Kafka Consumer participates in checkpointing and guarantees that no data is lost
during a failure, and that the computation processes elements 'exactly once. (These guarantees
naturally assume that Kafka itself does not lose any data.)
Please note that Flink snapshots the offsets internally as part of its distributed checkpoints.
The offsets committed to Kafka / Zookeeper are only to bring the outside view of progress in
sync with Flink's view of the progress. That way, monitoring and other jobs can get a view of
how far the Flink Kafka consumer has consumed a topic.
Please refer to Kafka's documentation for the available configuration properties:
http://kafka.apache.org/documentation.html#newconsumerconfigs
"""
def __init__(self, topics: Union[str, List[str]], deserialization_schema: DeserializationSchema,
properties: Dict):
"""
Creates a new Kafka streaming source consumer for Kafka 0.10.x.
This constructor allows passing multiple topics to the consumer.
:param topics: The Kafka topics to read from.
:param deserialization_schema: The de-/serializer used to convert between Kafka's byte
messages and Flink's objects.
:param properties: The properties that are used to configure both the fetcher and the offset
handler.
"""
JFlinkKafkaConsumer = get_gateway().jvm
.org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
j_flink_kafka_consumer = _get_kafka_consumer(topics, properties, deserialization_schema,
JFlinkKafkaConsumer)
super(FlinkKafkaConsumer, self).__init__(j_flink_kafka_consumer=j_flink_kafka_consumer)