I am creating kafka consumers dynamically based on number of partitions that a topic might have. ( using KafkaConsumer form Kafka-Python )
Once I create the consumers, using ThreadPoolExecuter I start threads that start listening to specific topics on these consumer-partitions
NOTE: This whole code falls under a flask API endpoint. The goal is to listen to a REST call, send message on producer and then listen to response on consumer, and then return the response to REST call
# function that listens to consumer messages in each thread
def _get_me_response(consumer_id, consumer):
for message in consumer:
message = message.value
break
consumer.commit()
return consumer_id, message
# ThreadPoolExecuter to start threads
with ThreadPoolExecutor(max_workers=len(consumers)) as executor:
futures = []
for consumer_id, consumer in consumers.items():
futures.append(executor.submit(_get_me_response,
consumer=consumer,
consumer_id=consumer_id
)
)
Now once Ive submitted my threads, I can successfuly recieve messages on each thread.
My Issuse lies with collecting the results of the futures and using that result to respond to the REST endpoint
Most examples online show how to get result form futures with a "print" statement on whatever result is obtained. Now, here is my code to get the response from futures :
# code for gathering result from future in whichever order they arrive
for future in as_completed(futures):
resp_cid, response = future.result()
print(json.dumps(response))
if response['match_status'] == 1:
break
return response
What I want to do is, if I receive a response on any one of the future, I want to go to the return statement instead of waiting on other futures to complete ( they never will since they have a for loop on consumer which never ends ). The print statment prints successfully in the console but I cannot exit the loop
I have tried using a global variable which I
but this doesnt seem to work as expected.
Code for global variable usage :
RETURN_CONSUMER_FLAG = False <---- Defining global variable
# function that listens to consumer messages in each thread with global var check
def _get_me_response(consumer_id, consumer):
while not RETURN_CONSUMER_FLAG: <---- Looping on global variable
for message in consumer:
message = message.value
consumer.commit()
return consumer_id, message
...
...
# code for gathering result from future in whichever order they arrive
for future in as_completed(futures):
resp_cid, response = future.result()
print(json.dumps(response))
if response['match_status'] == 1:
global RETURN_CONSUMER_FLAG <---- Setting global variable to TRUE
RETURN_CONSUMER_FLAG = True
return response
I checked the code for as_completed and it seems like this might be since its a yield generator instead of a return function so it keeps waiting for all futures to complete work before it can exit the loop.
Any idea on how to get around this issue?
Since the suggestion by @Louis Lac did not fit my issue, I ended up providing a timeout to the Kafka consumer like this :
consumer = KafkaConsumer(CONSUMER_TOPIC, group_id='ME2',
bootstrap_servers=[f"{KAFKA_SERVER_HOST}:{KAFKA_SERVER_PORT}"],
value_deserializer=lambda x: json.loads(x.decode('utf-8')),
enable_auto_commit=True,
auto_offset_reset='latest',
max_poll_records=1,
max_poll_interval_ms=300000,
consumer_timeout_ms=300000)
This does not exactly do what I would have wished for but it works. This part of my code is non-trivial to the app so I was okay with this compromise.