I am getting the message stream of 3MB from kafka topic but the default value is 1MB. Now I have changed the kafka properties from 1MB to 3MB by adding the below lines in kafa consumer.properties and server.properties file.
fetch.message.max.bytes=2048576 ( consumer.properties )
filemessage.max.bytes=2048576 ( server.properties )
replica.fetch.max.bytes=2048576 ( server.properties )
Now after adding the above lines in Kafka, 3MB message data is going into kafka data logs. But STORM is unable to process that 3MB data and it is able to read only default size i.e.,1MB data.
So how to change those configurations inorder to process/read the 3MB data. Here is my topology class.
String argument = args[0];
Config conf = new Config();
conf.put(JDBC_CONF, map);
conf.setDebug(true);
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
//set the number of workers
conf.setNumWorkers(3);
TopologyBuilder builder = new TopologyBuilder();
//Setup Kafka spout
BrokerHosts hosts = new ZkHosts("localhost:2181");
String topic = "year1234";
String zkRoot = "";
String consumerGroupId = "group1";
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
builder.setSpout("KafkaSpout", kafkaSpout,1);
builder.setBolt("user_details", new Parserspout(),1).shuffleGrouping("KafkaSpout");
builder.setBolt("bolts_user", new bolts_user(cp),1).shuffleGrouping("user_details");
Add following lines below
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, consumerGroupId);
spoutConfig.fetchSizeBytes = 3048576;
spoutConfig.bufferSizeBytes = 3048576;