I have a Kafka consumer with 10 consumer threads consuming from 10 partitions and inserting into MQ using the JMS template caching connection factory (SessionCacheSize(10)). The MQ insertion part is ASYNC here. So, the Kafka consumer part and MQ insertion part happen in ASYNC mode, but what I'm experiencing here is that my Kafka consumer layer is able to consume 100k events within 1 min, whereas only 8 threads or tasks are running at any given point in time when inserting into MQ. We need help increasing the number of threads and instances inserting into MQ.
below is my JMSTemplate caching connection factory code which will be called by another method to insert into MQ.
@Configuration
public class MQutil {
@Bean
public CachingConnectionFactory cachingConnectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
try{
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", "false");
// Create an MQConnectionFactory with the MQ properties
MQConnectionFactory mqConnectionFactory = new MQConnectionFactory();
mqConnectionFactory.setHostName("ca");
mqConnectionFactory.setPort(1414);
mqConnectionFactory.setQueueManager("QMGR");
mqConnectionFactory.setChannel("CHANNEL");
mqConnectionFactory.setSSLCipherSuite("TLS_RSA_WITH_AES_128_CBC_SHA256");
mqConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
UserCredentialsConnectionFactoryAdapter connectionFactoryAdapter=new UserCredentialsConnectionFactoryAdapter();
connectionFactoryAdapter.setTargetConnectionFactory(mqConnectionFactory);
connectionFactoryAdapter.setUsername("dmin");
connectionFactoryAdapter.setPassword("21ff");
cachingConnectionFactory.setTargetConnectionFactory(connectionFactoryAdapter);
cachingConnectionFactory.setSessionCacheSize(10);
} catch (Exception e) {
System.out.println("Exception Inserting Message Into MQ "+ e);
}
return cachingConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
jmsTemplate.setDefaultDestinationName("TEST.TOPIC.QUEUE");
return jmsTemplate;
}
}
the below method calls the JMSTemplate
@Async
public void insertIntoMQ(String kafkaMessage) {
//log.info("Inside insertIntoMQ function");
try{
jmsTemplate.convertAndSend(kafkaMessage); // use this if you want to use the default destination name
log.info("Message Inserted Successfully :\n" + kafkaMessage);
} catch (Exception e) {
log.error("Exception Inserting Message Into MQ ", e);
}
}
below are my Kafka consumer configuration
Properties config = new Properties();
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokersUrl);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
config.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class);
Need help to increase the number of threads/tasks inserting into MQ.
I have updated the Code to perform synchronously instead of Async which helps in getting thosee 2 remaining threads from 8 Task to 10 Task which is same as my no of consumer stream threads and additionaly , increased performance by using few configs like below and was able to achieve 850 TPS.
@Configuration
public class MQutil {
@Bean
public QosSettings qos_Settings() {
QosSettings qosSettings = new QosSettings();
qosSettings.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
return qosSettings;
}
@Bean
public CachingConnectionFactory cachingConnectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
try{
System.setProperty("com.ibm.mq.cfg.useIBMCipherMappings", "false");
// Create an MQConnectionFactory with the MQ properties
MQConnectionFactory mqConnectionFactory = new MQConnectionFactory();
mqConnectionFactory.setHostName("cdopla-854.bell.corp.bce.ca");
mqConnectionFactory.setPort(1414);
mqConnectionFactory.setQueueManager("ESBUATQMGR");
mqConnectionFactory.setChannel("ESIDE.CHANNEL");
mqConnectionFactory.setSSLCipherSuite("TLS_RSA_WITH_AES_128_CBC_SHA256");
mqConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
UserCredentialsConnectionFactoryAdapter connectionFactoryAdapter=new UserCredentialsConnectionFactoryAdapter();
connectionFactoryAdapter.setTargetConnectionFactory(mqConnectionFactory);
connectionFactoryAdapter.setUsername("esideadmin");
connectionFactoryAdapter.setPassword("esbcan21ff");
cachingConnectionFactory.setTargetConnectionFactory(connectionFactoryAdapter);
cachingConnectionFactory.setSessionCacheSize(10);
} catch (Exception e) {
System.out.println("Exception Inserting Message Into MQ "+ e);
}
return cachingConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory());
jmsTemplate.setDefaultDestinationName("TEST.TOPIC.QUEUE2");
jmsTemplate.setExplicitQosEnabled(true);
//jmsTemplate.setQosSettings(qos_Settings()); -- increase performace double fold by getting ack at main mem state instead of disk storage persist state
jmsTemplate.setSessionTransacted(false);
jmsTemplate.setSessionAcknowledgeMode(JMSContext.AUTO_ACKNOWLEDGE);
return jmsTemplate;
}
}