I have two servers where bidirectional communication needs to be happened.
For kafka I have 2 topics 2 producers and 2 consumers. Trying to communicate server 1 to 2 with 1topic and 1 consumer and 1 producer and vice versa for the 2nd server.
producer.py
from kafka import KafkaProducer, KafkaConsumer
import gevent
import time
producer = KafkaProducer(bootstrap_servers='localhost:9092')
consumer = KafkaConsumer('game_events', bootstrap_servers='localhost:9092')
def prod():
while True:
time.sleep(2)
producer.send('sockets', b'Hello world')
producer.flush()
def cons():
while True:
game_server = consumer.__next__()
print(game_server)
gevent.spawn(prod())
gevent.spawn(cons())
Consumer.py
from kafka import KafkaConsumer, KafkaProducer
import gevent
import time
consumer = KafkaConsumer('sockets', bootstrap_servers=['localhost:9092'])
producer = KafkaProducer(bootstrap_servers='localhost:9092')
def cons():
while(True):
game_server = consumer.__next__()
print(game_server)
def prod():
while True:
time.sleep(2)
print("game_server")
producer.send('game_events', b'Hello Game Event')
producer.flush()
#for message in consumer:
# print(message)
gevent.spawn(cons())
gevent.spawn(prod())
Here the consumer is just receiving the sockets topic message but not the topic game_events. Is there any way to achieve this ? or Kafka doesn't support it ?
producer.py
consumer.py
Well, that's confusing. Both modules have producers and consumers....
You call cons()
, which starts an infinite loop. If the loop ever ended, that result is passed into gevent.spawn()
, and only then would prod()
function run.
If you meant to start a Greenlet thread, you need to pass the function handle itself, which internally starts threads for those functions.
gevent.spawn(cons)
gevent.spawn(prod)
Also, rather than a while loop for the consumer, you should be able to have for msg in consumer
, then you need to get msg.value()
You don't really need gevent
.
Your producer could be a Timer
and the consumer just a Thread
. https://docs.python.org/3/tutorial/stdlib2.html#multi-threading