Search code examples
pythonmultithreadingapache-kafkakafka-python

Kafka consumer not receiveing messages after first consumption


I have a problem while trying to make a data flux between a class called AD_Drone and another called AD_Engine. Engine has to produce for every drone connected their final position, after that, Drone calculates its next position and sends its movement (N,S,W,E...). It has to be this way till every drone arrives at its final position. The problem I've encountered it's that, after Engine has produced every position and sends the next actual position, the drone consumer keeps "stuck" but it receives no messages from AD_Drone. Here it's my Engine code:

def empezar_espectaculo():
    print("############## ESPECTÁCULO INICIADO ################")
    productor = KafkaProducer(value_serializer=lambda v: pickle.dumps(v),
                              bootstrap_servers=ip_kafka + ":" + puerto_kafka)

    terminado = False

    # Enviando posiciones finales definitivas
    for dron in LISTA_DRONES:
        productor.send("dron", value=str(dron.ID) + "," + str(dron.X_FIN) + ',' + str(dron.Y_FIN))
        productor.flush()

    while not terminado:

        # CONSUMIR
        pos = consume_pos_act()  # Obtenemos las posiciones actuales de los drones
        print("Posiciones consumidas",pos)

        NUEVO_MAPA = []

        for dron in LISTA_DRONES:
            for p in pos:
                dron_id, direccion = p[0], p[1]
                if dron.ID == int(dron_id):
                    if direccion == "N":
                        dron.Y_ACTUAL -= 1
                    elif direccion == "S":
                        dron.Y_ACTUAL += 1
                    elif direccion == "E":
                        dron.X_ACTUAL -= 1
                    elif direccion == "O":
                        dron.X_ACTUAL += 1
                    elif direccion == "NE":
                        dron.Y_ACTUAL -= 1
                        dron.X_ACTUAL += 1
                    elif direccion == "NO":
                        dron.Y_ACTUAL -= 1
                        dron.X_ACTUAL -= 1
                    elif direccion == "SE":
                        dron.Y_ACTUAL += 1
                        dron.X_ACTUAL += 1
                    elif direccion == "SO":
                        dron.Y_ACTUAL += 1
                        dron.X_ACTUAL -= 1

            nueva_pos = str(dron.ID) + ',' + str(dron.X_ACTUAL) + ',' + str(dron.Y_ACTUAL)
            NUEVO_MAPA.append(nueva_pos)

        print("nuevo mapa:",NUEVO_MAPA)

        sleep(5)

        produce_pos_act(NUEVO_MAPA)

        if all(dron.COMPLETADO == True for dron in LISTA_DRONES):
            terminado = True

        sleep(1)

Here are the producers and consumers for Engine

def consume_pos_act():
    consumidor = KafkaConsumer("movimiento", value_deserializer=lambda v: pickle.loads(v),
                               bootstrap_servers=ip_kafka + ':' + puerto_kafka,
                               group_id="engine", auto_offset_reset="latest", enable_auto_commit = True, consumer_timeout_ms = 10000)

    NUEVAS_POSICIONES = []
    contador = 0

    for mensaje in consumidor:

        men = mensaje.value
        info = men.split(',')
        NUEVAS_POSICIONES.append(info)
        print("Me han pasado:", info)

        contador += 1
        if contador == len(LISTA_DRONES):
            break

    # consumidor.close()
    return NUEVAS_POSICIONES


def produce_pos_act(nuevas_posiciones):
    productor = KafkaProducer(value_serializer=lambda v: pickle.dumps(v),
                              bootstrap_servers=ip_kafka + ":" + puerto_kafka)

    for pos in nuevas_posiciones:
        print("Enviando a dron", pos)
        productor.send("dron", value=pos)

Here is my AD_Drone consumer-producer logic

 while not self.COMPLETADO:
                    print("Pasando a producir pos actual")
                    self.produce_pos_act()
                    print("Pasando a consumir pos actual")
                    sleep(2)
                    self.consume_pos('a')

def produce_pos_act(self):
        pos_actual = self.calcula_pos_actual(int(self.X_FIN), int(self.Y_FIN), int(self.X_ACTUAL),              int(self.Y_ACTUAL))
        dato = str(self.ID) + ',' + str(pos_actual)

        print("Envío a Engine", dato)
        self.PRODUCER.send("movimiento", value=dato)

        # self.PRODUCER.close()

    def consume_pos(self, pos):
        consumidor = KafkaConsumer("dron", value_deserializer=lambda v: pickle.loads(v),
                                   bootstrap_servers=self.IP_KAFKA + ":" + self.PORT_KAFKA, group_id=self.ALIAS
                                   , auto_offset_reset="latest", consumer_timeout_ms = 10000)

        for mensaje in consumidor:
            print("Entro en consumición")
            datos = mensaje.value
            datos = datos.split(',')
            print("Recibo", datos)

            if datos[0] == self.ID:
                if pos == 'a':
                    self.X_ACTUAL = datos[1]
                    self.Y_ACTUAL = datos[2]
                    print("Soy " + self.ALIAS + " mi posición ACTUAL es (" + self.X_ACTUAL + ',' + self.Y_ACTUAL + ')')
                    break

                elif pos == 'f':
                    self.X_FIN = datos[1]
                    self.Y_FIN = datos[2]
                    print("Soy " + self.ALIAS + " mi posición FINAL es (" + self.X_FIN + ',' + self.Y_FIN + ')')
                    break

Thank you all, sorry, this is my first post here and I don't know many rules.

I've tried to put sleeps and trying to make parallel threads but it keeps stuck


Solution

  • If a consumer is "stuck", then the producer isn't sending data to the topic, or you've otherwise broken out a consumer processing loop, and the consumer is then closed (consumer instances are not thread safe, so parallel threads probably won't help) - You should create one KafkaConsumer and pass it into consume_pos_act() as a parameter, not create one per function call, and similar for produce_pos_act() producer, you only need one of each across the whole app

    You also have auto_offset_reset="latest", auto_commit=True, so if your consumers are constantly being created and restarting, then they're constantly moving towards the end of the topic, where there may be no actively running producer to send new data, as mentioned