Search code examples
pythonapache-kafkaspring-cloud-dataflowpolyglot

Polyglot Processor with Local Dataflow Server


I have been trying to work with polyglot and build a simple python processor. I followed the polyglot recipe and I could not get the stream to deploy. I originally deployed the same processor that is used in the example and got the following errors:

Unknown command line arg requested: spring.cloud.stream.bindings.input.destination Unknown environment variable requested: SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS

Traceback (most recent call last):
File "/processor/python_processor.py", line 10, in
consumer = KafkaConsumer(get_input_channel(), bootstrap_servers=[get_kafka_binder_brokers()])
File "/usr/local/lib/python2.7/dist-packages/kafka/consumer/group.py", line 353, in init
self._client = KafkaClient(metrics=self._metrics, **self.config)
File "/usr/local/lib/python2.7/dist-packages/kafka/client_async.py", line 203, in init
self.cluster = ClusterMetadata(**self.config)
File "/usr/local/lib/python2.7/dist-packages/kafka/cluster.py", line 67, in init
self._bootstrap_brokers = self._generate_bootstrap_brokers()
File "/usr/local/lib/python2.7/dist-packages/kafka/cluster.py", line 71, in _generate_bootstrap_brokers
bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])
File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 1336, in collect_hosts
host, port, afi = get_ip_port_afi(host_port)
File "/usr/local/lib/python2.7/dist-packages/kafka/conn.py", line 1289, in get_ip_port_afi
host_and_port_str = host_and_port_str.strip()
AttributeError: 'NoneType' object has no attribute 'strip'
Exception AttributeError: "'KafkaClient' object has no attribute '_closed'" in <bound method KafkaClient.del of <kafka.client_async.KafkaClient object at 0x7f8b7024cf10>> ignored

I then attempted to pass the environment and binding arguments through the deployment stream but that did not work. When I manually inserted the SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS and spring.cloud.stream.bindings.input.destination parameter into Kafka's consumer I was able to deploy the stream as a workaround. I am not entirely sure what is causing the issue, would deploying this on Kubernetes be any different or is this an issue with Polyglot and Dataflow? Any help with this would be appreciated.

Steps to reproduce: Attempt to deploy polyglot-processor stream from polyglot recipe on local dataflow server. I am also using the same stream definition as in the example: http --server.port=32123 | python-processor --reversestring=true | log.

Additional context: I am attempting to deploy the stream on a local installation of SPDF and Kafka since I had some issues deploying custom python applications with Docker.


Solution

  • The recipe you have posted above expects the SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS environment variable present as part of the server configuration (since the streams are managed via Skipper server, you would need to set this environment variable in your Skipper server configuration).

    You can check this documentation on how you can set SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS as environment property in Skipper server deployment.

    You can also pass this property as a deployer property when deploying the python-processor stream app. You can refer this documentation on how you can pass deployment property to set the Spring Cloud Stream properties (here the binder configuration property) at the time of stream deployment.