Search code examples
node.jsapache-kafkakafka-producer-apikafka-rest

Processing huge records gives OutOfMemoryException - Kafka REST proxy


I'm calling Kafka using the confluent REST API proxy. I'm reading a CSV file, creating an object out of all the records there (about 4 million records) and sending a request to the REST proxy. I keep on getting an OutOfMemory exception.

The exact exception message is:

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-81"

I've just one instance of the REST proxy server, hosted as a docker container. The environment variable is set to:

JAVA_OPTIONS=-Xmx1g

Other configs:

CPU - 1 Memory - 1024

It processes about 1,00,000 before crashing. I've tried scaling it to 4 instances with increasing CPU to 3 and memory to 2046 mb too. It then processes about 5,00,000 records.

After reading the csv, I'm calling Kafka endpoint in batches of 5k records. That is written in Node. Here's the Node code

fs.createReadStream(inputFile)
  .pipe(parser({skip_lines_with_error: true}))
  .on('data', (records) => {
        country.push({ 'value' : {
            country: records[0],
            capital: records[1]
            }
        });

        if (country.length > 5000) {
            batch++;
            callKafkaProxy(country).then((rec) => {
                console.log(`'Batch done!'`);
            }).catch((reason) => {
                console.log(reason);
            });
            country = [];
        }
    })
    .on('end', () => {
        console.log('All done!');
    });
function callKafkaProxy(records) {
    const urlAndRequestOptions = {
        url: 'http://kafka-rest-proxy.com/topics/test-topic',
        headers: {
            'content-type' : 'application/vnd.kafka.json.v2+json',
            'Accept' : 'application/vnd.kafka.v2+json'
        }
    };
let recordsObject = {records: records};
//request here is a wrapper on the http package. 
return request.post(urlAndRequestOptions, recordsObject);

I feel like I'm missing some configurations which should help solve this without increasing the number of instances > 1.

Any help will be appreciated.


Solution

  • .on('data', () => {}); ... 
    

    1. It does not handle backpressure. Create writable stream, which will handle your batching process. Then just use pipe.

    inputStream
        .pipe(parser)
        .pipe(kafka)
    

    Then analysing these lines:

    if (country.length > 5000) {
            batch++;
            callKafkaProxy(country).then((rec) => {
                console.log(`'Batch done!'`);
            ).catch((reason) => {
                console.log(reason);
            });
            country = [];
         }
    
    1. Your callKafkaProxy is asynchronous, that is why your country array is always filled, no matter the result of callKafkaProxy function. Country array keeps filling and keeps making requests. You can make sure by console logging after batch++. You will see that you are initiating lots of requests and Kafka will respond much slower than you are making requests.

    Solution:

    1. Create writable stream.
    2. pipe data to it from your parser. input.pipe(parser).pipe(yourJustCreatedKafkaWritableStream)
    3. Let your writable stream to push countries to array and callback when you are ready to receive other record. When you reach your edge (if countries.length > 5000) then make request to kafka and wait for the response and only then give callback. In this way your stream will be adaptive. You should read more about node streams and their power. But remember, with great power comes great responsibility, in which case you have to design code carefully to avoid such memory leaks.