Search code examples
pythonapache-kafkakafka-consumer-apigeventkafka-python

Does Kafka to support bidirectional?


I have two servers where bidirectional communication needs to be happened.

For kafka I have 2 topics 2 producers and 2 consumers. Trying to communicate server 1 to 2 with 1topic and 1 consumer and 1 producer and vice versa for the 2nd server.

producer.py

from kafka import KafkaProducer, KafkaConsumer
import gevent
import time
producer = KafkaProducer(bootstrap_servers='localhost:9092')
consumer = KafkaConsumer('game_events', bootstrap_servers='localhost:9092')

def prod():
    while True:
        time.sleep(2)
        producer.send('sockets', b'Hello  world') 
        producer.flush()
    

def cons():
    while True:
        game_server = consumer.__next__()
        print(game_server)


gevent.spawn(prod())
gevent.spawn(cons())

Consumer.py

from kafka import KafkaConsumer, KafkaProducer
import gevent
import time
consumer = KafkaConsumer('sockets', bootstrap_servers=['localhost:9092'])
producer = KafkaProducer(bootstrap_servers='localhost:9092')

def cons():
    while(True):
        game_server = consumer.__next__()
        print(game_server)


def prod():
    while True:
        time.sleep(2)
        print("game_server")
        producer.send('game_events', b'Hello  Game Event') 
        producer.flush()

#for message in consumer:
#    print(message)



gevent.spawn(cons())
gevent.spawn(prod())

Here the consumer is just receiving the sockets topic message but not the topic game_events. Is there any way to achieve this ? or Kafka doesn't support it ?


Solution

  • producer.py
    consumer.py

    Well, that's confusing. Both modules have producers and consumers....


    You call cons(), which starts an infinite loop. If the loop ever ended, that result is passed into gevent.spawn(), and only then would prod() function run.

    If you meant to start a Greenlet thread, you need to pass the function handle itself, which internally starts threads for those functions.

    gevent.spawn(cons)
    gevent.spawn(prod)
    

    Also, rather than a while loop for the consumer, you should be able to have for msg in consumer, then you need to get msg.value()


    You don't really need gevent.

    Your producer could be a Timer and the consumer just a Thread. https://docs.python.org/3/tutorial/stdlib2.html#multi-threading