Search code examples
python-3.xapache-kafkafaust

Running Faust with kafka crashes with ConsumerStoppedError


I freshly installed Faust and ran a basic program to send and receive messages over Kafka.I used the sample code mentioned in (Faust example of publishing to a kafka topic) While running the program initially it connects to Kafka(which is also running on my machine). But then while trying to consume Kafka gets disconnected and the app crashes with the below exception

[2020-11-11 07:08:26,623] [76392] [ERROR] [^---Fetcher]: Crashed reason=ConsumerStoppedError() 
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/mode/services.py", line 779, in _execute_task
    await task
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/consumer.py", line 176, in _fetcher
    await self._drainer
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/consumer.py", line 1039, in _drain_messages
    async for tp, message in ait:
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/consumer.py", line 640, in getmany
    records, active_partitions = await self._wait_next_records(timeout)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/consumer.py", line 676, in _wait_next_records
    records = await self._getmany(
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/consumer.py", line 1269, in _getmany
    return await self._thread.getmany(active_partitions, timeout)
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 805, in getmany
    return await self.call_thread(
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/mode/threads.py", line 436, in call_thread
    result = await promise
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/mode/threads.py", line 383, in _process_enqueued
    result = await maybe_async(method(*args, **kwargs))
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/mode/utils/futures.py", line 134, in maybe_async
    return await res
  File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 822, in _fetch_records
    raise ConsumerStoppedError()

On debugging the reason for the consumer getting disconnected I see that in fetcher.py of alokafka consumer is the connection getting closed due to the below exception

    Unable to display children:Error resolving variables Traceback (most recent call last):
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydevd_bundle/pydevd_resolver.py", line 205, in resolve
    def resolve(self, dict, key):
KeyError: 'Exception'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydevd_bundle/pydevd_comm.py", line 1227, in do_it
    def do_it(self, dbg):
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydevd_bundle/pydevd_vars.py", line 262, in resolve_compound_variable_fields
    def resolve_compound_variable_fields(thread_id, frame_id, scope, attrs):
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydevd_bundle/pydevd_vars.py", line 169, in getVariable
    def getVariable(thread_id, frame_id, scope, attrs):
  File "/Applications/PyCharm CE.app/Contents/plugins/python-ce/helpers/pydev/_pydevd_bundle/pydevd_resolver.py", line 205, in resolve
    def resolve(self, dict, key):
AttributeError: 'dict' object has no attribute 'Exception'

The software versions are as given below

  • Mac OS : 10.15.4
  • Kafka : 2_12.2.1.1
  • Aiokafka: 1.1.6
  • Python : 3.9.0
  • Faust : 1.10.4

Please help here.


Solution

  • I think your problem is the same mine I was with python 3.9 and I changed to 3.8 now It works.