Search code examples
c++apache-kafkakafka-consumer-apikafka-producer-api

C++ with Kafka - consumer just receives some producer messages


I'm using the following C++ code to produce messages to Kafka:

#include <thread>
#include <cppkafka/producer.h>

using namespace cppkafka;

int main()
{
    for(int i = 0 ; i < 100 ; i++)
    {
        std::cout << "sending msg number: " << i << std::endl;
        std::string str("{'msg number' : " + std::to_string(i) + "}");

        // Create a message builder for this topic
        MessageBuilder builder("test");

        // Construct the configuration
        Configuration config =
        {
            { "metadata.broker.list", "192.168.1.100:9092"}
        };

        // Create the producer
        Producer producer(config);

        builder.payload(str);

        producer.produce(builder); //Only a few messages are received!

        std::this_thread::sleep_for(std::chrono::milliseconds(50));//If I remove this, no message is received!
    }
}

On my machine where I have running the Zookeeper and Kafka server, I'm running a consumer, to show the messages received, by using this:

bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.100:9092 --topic test

My C++ code produces the following:

sending msg number: 0
sending msg number: 1
sending msg number: 2
sending msg number: 3
(...) //from 0 to 99...all the messages are sent!
sending msg number: 98
sending msg number: 99

I was expecting the consumer, to receive all these messages, but I see just a few:

{'msg number' : 40}
{'msg number' : 58}
{'msg number' : 70}
{'msg number' : 75}
{'msg number' : 91}
{'msg number' : 96}

And no more received.

If I remove the line:

std::this_thread::sleep_for(std::chrono::milliseconds(50));

I do not receive any message. Why is my Kafka server not receiving all my messages?


Solution

  • Original Comment:

    It is probably sort of spam prevention, i.e. sending messages out too quickly and the server is just ditching them

    There are lots of different anti-spam techniques developed by web server providers. This is to stop somebody spamming your server by sending tonnes of web traffic. I think this is likely what you are doing

    https://en.wikipedia.org/wiki/Anti-spam_techniques