I'm calling Kafka using the confluent REST API proxy. I'm reading a CSV file, creating an object out of all the records there (about 4 million records) and sending a request to the REST proxy. I keep on getting an OutOfMemory
exception.
The exact exception message is:
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-81"
I've just one instance of the REST proxy server, hosted as a docker container. The environment variable is set to:
JAVA_OPTIONS=-Xmx1g
Other configs:
CPU - 1
Memory - 1024
It processes about 1,00,000 before crashing. I've tried scaling it to 4 instances with increasing CPU to 3 and memory to 2046 mb too. It then processes about 5,00,000 records.
After reading the csv, I'm calling Kafka endpoint in batches of 5k records. That is written in Node. Here's the Node code
fs.createReadStream(inputFile)
.pipe(parser({skip_lines_with_error: true}))
.on('data', (records) => {
country.push({ 'value' : {
country: records[0],
capital: records[1]
}
});
if (country.length > 5000) {
batch++;
callKafkaProxy(country).then((rec) => {
console.log(`'Batch done!'`);
}).catch((reason) => {
console.log(reason);
});
country = [];
}
})
.on('end', () => {
console.log('All done!');
});
function callKafkaProxy(records) {
const urlAndRequestOptions = {
url: 'http://kafka-rest-proxy.com/topics/test-topic',
headers: {
'content-type' : 'application/vnd.kafka.json.v2+json',
'Accept' : 'application/vnd.kafka.v2+json'
}
};
let recordsObject = {records: records};
//request here is a wrapper on the http package.
return request.post(urlAndRequestOptions, recordsObject);
I feel like I'm missing some configurations which should help solve this without increasing the number of instances > 1.
Any help will be appreciated.
.on('data', () => {}); ...
1. It does not handle backpressure. Create writable stream, which will handle your batching process. Then just use pipe.
inputStream
.pipe(parser)
.pipe(kafka)
Then analysing these lines:
if (country.length > 5000) {
batch++;
callKafkaProxy(country).then((rec) => {
console.log(`'Batch done!'`);
).catch((reason) => {
console.log(reason);
});
country = [];
}
Solution: