Search code examples
apache-kafkakafka-producer-api

Kafka messages not accepted by server (only from remote IP?)


My Kafka server works fine, if I produce and consume from a C++ application, or from the command line, from the local machine.

But it does not work well from an external IP address: the topic gets created, I see network traffic using tcpdump with ACKs (meaning kafka is answering), but I find no messages on the queue, and java gives no error. Can't find nothing on the logs. Or google. This is my app:

public class KafkaPBJProducer {
    private static String KAFKA_TOPIC = "test";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "192.168.1.131:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        properties.put("kafka.topic", KAFKA_TOPIC);
        runMainLoop(properties);
    }

    static void runMainLoop(Properties properties) {
        @SuppressWarnings("resource")
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        for (int count = 0; count < 10; count++) {
            String msg = getMsg(String.valueOf(count));
            System.out.println("Producing message: " + msg);
            producer.send(new ProducerRecord<String, String>(KAFKA_TOPIC, 0, "dev-" + count, msg));
            producer.flush();
        }
    }

    public static String getMsg(String id) {
        JsonObject obj = new JsonObject();
        try {
            obj.addProperty("id", id);
            obj.addProperty("timestamp", new Timestamp(System.currentTimeMillis()).toString());
            obj.addProperty("data", Base64.getEncoder().encodeToString("Hello, World!".getBytes("utf-8")));
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return new Gson().toJson(obj);
    }
}

When it runs, it gets stucked like a minute on each message (no errors are displayed, they just take a lot of time to get sent):

Producing message: {"id":"0","timestamp":"2018-08-21 13:49:10.697","data":"SGVsbG8sIFdvcmxkIQ\u003d\u003d"}
Producing message: {"id":"1","timestamp":"2018-08-21 13:50:10.794","data":"SGVsbG8sIFdvcmxkIQ\u003d\u003d"}
Producing message: {"id":"2","timestamp":"2018-08-21 13:51:10.797","data":"SGVsbG8sIFdvcmxkIQ\u003d\u003d"}
Producing message: {"id":"3","timestamp":"2018-08-21 13:52:10.813","data":"SGVsbG8sIFdvcmxkIQ\u003d\u003d"}

I can see the topic was created:

> bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
test

I can manually add content to the topic using the command line:

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>one
>two
>four

But if I check the topic, only the manually sent messages are listed:

$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.131:9092 --topic test --from-beginning
one
two
four

(after several minutes, I press CTRL-C)
^CProcessed a total of 3 messages

What is happening here? why are the topics created but no messages are accepted on the server? Why doesn't kafka report any errors?

TIA!


Solution

  • Found. Based on Kafka - Unable to send a message to a remote server using Java, the problem was on the config file conf/servers.properties; it requires uncommenting this line:

    advertised.listeners=PLAINTEXT://192.168.1.131:9092
    

    Thanks anyway.