I have a problem while trying to make a data flux between a class called AD_Drone and another called AD_Engine. Engine has to produce for every drone connected their final position, after that, Drone calculates its next position and sends its movement (N,S,W,E...). It has to be this way till every drone arrives at its final position. The problem I've encountered it's that, after Engine has produced every position and sends the next actual position, the drone consumer keeps "stuck" but it receives no messages from AD_Drone. Here it's my Engine code:
def empezar_espectaculo():
print("############## ESPECTÁCULO INICIADO ################")
productor = KafkaProducer(value_serializer=lambda v: pickle.dumps(v),
bootstrap_servers=ip_kafka + ":" + puerto_kafka)
terminado = False
# Enviando posiciones finales definitivas
for dron in LISTA_DRONES:
productor.send("dron", value=str(dron.ID) + "," + str(dron.X_FIN) + ',' + str(dron.Y_FIN))
productor.flush()
while not terminado:
# CONSUMIR
pos = consume_pos_act() # Obtenemos las posiciones actuales de los drones
print("Posiciones consumidas",pos)
NUEVO_MAPA = []
for dron in LISTA_DRONES:
for p in pos:
dron_id, direccion = p[0], p[1]
if dron.ID == int(dron_id):
if direccion == "N":
dron.Y_ACTUAL -= 1
elif direccion == "S":
dron.Y_ACTUAL += 1
elif direccion == "E":
dron.X_ACTUAL -= 1
elif direccion == "O":
dron.X_ACTUAL += 1
elif direccion == "NE":
dron.Y_ACTUAL -= 1
dron.X_ACTUAL += 1
elif direccion == "NO":
dron.Y_ACTUAL -= 1
dron.X_ACTUAL -= 1
elif direccion == "SE":
dron.Y_ACTUAL += 1
dron.X_ACTUAL += 1
elif direccion == "SO":
dron.Y_ACTUAL += 1
dron.X_ACTUAL -= 1
nueva_pos = str(dron.ID) + ',' + str(dron.X_ACTUAL) + ',' + str(dron.Y_ACTUAL)
NUEVO_MAPA.append(nueva_pos)
print("nuevo mapa:",NUEVO_MAPA)
sleep(5)
produce_pos_act(NUEVO_MAPA)
if all(dron.COMPLETADO == True for dron in LISTA_DRONES):
terminado = True
sleep(1)
Here are the producers and consumers for Engine
def consume_pos_act():
consumidor = KafkaConsumer("movimiento", value_deserializer=lambda v: pickle.loads(v),
bootstrap_servers=ip_kafka + ':' + puerto_kafka,
group_id="engine", auto_offset_reset="latest", enable_auto_commit = True, consumer_timeout_ms = 10000)
NUEVAS_POSICIONES = []
contador = 0
for mensaje in consumidor:
men = mensaje.value
info = men.split(',')
NUEVAS_POSICIONES.append(info)
print("Me han pasado:", info)
contador += 1
if contador == len(LISTA_DRONES):
break
# consumidor.close()
return NUEVAS_POSICIONES
def produce_pos_act(nuevas_posiciones):
productor = KafkaProducer(value_serializer=lambda v: pickle.dumps(v),
bootstrap_servers=ip_kafka + ":" + puerto_kafka)
for pos in nuevas_posiciones:
print("Enviando a dron", pos)
productor.send("dron", value=pos)
Here is my AD_Drone consumer-producer logic
while not self.COMPLETADO:
print("Pasando a producir pos actual")
self.produce_pos_act()
print("Pasando a consumir pos actual")
sleep(2)
self.consume_pos('a')
def produce_pos_act(self):
pos_actual = self.calcula_pos_actual(int(self.X_FIN), int(self.Y_FIN), int(self.X_ACTUAL), int(self.Y_ACTUAL))
dato = str(self.ID) + ',' + str(pos_actual)
print("Envío a Engine", dato)
self.PRODUCER.send("movimiento", value=dato)
# self.PRODUCER.close()
def consume_pos(self, pos):
consumidor = KafkaConsumer("dron", value_deserializer=lambda v: pickle.loads(v),
bootstrap_servers=self.IP_KAFKA + ":" + self.PORT_KAFKA, group_id=self.ALIAS
, auto_offset_reset="latest", consumer_timeout_ms = 10000)
for mensaje in consumidor:
print("Entro en consumición")
datos = mensaje.value
datos = datos.split(',')
print("Recibo", datos)
if datos[0] == self.ID:
if pos == 'a':
self.X_ACTUAL = datos[1]
self.Y_ACTUAL = datos[2]
print("Soy " + self.ALIAS + " mi posición ACTUAL es (" + self.X_ACTUAL + ',' + self.Y_ACTUAL + ')')
break
elif pos == 'f':
self.X_FIN = datos[1]
self.Y_FIN = datos[2]
print("Soy " + self.ALIAS + " mi posición FINAL es (" + self.X_FIN + ',' + self.Y_FIN + ')')
break
Thank you all, sorry, this is my first post here and I don't know many rules.
I've tried to put sleeps and trying to make parallel threads but it keeps stuck
If a consumer is "stuck", then the producer isn't sending data to the topic, or you've otherwise broken out a consumer processing loop, and the consumer is then closed (consumer instances are not thread safe, so parallel threads probably won't help) - You should create one KafkaConsumer and pass it into consume_pos_act()
as a parameter, not create one per function call, and similar for produce_pos_act()
producer, you only need one of each across the whole app
You also have auto_offset_reset="latest", auto_commit=True
, so if your consumers are constantly being created and restarting, then they're constantly moving towards the end of the topic, where there may be no actively running producer to send new data, as mentioned