@Configuration
@Component
public class GatewayAqrConfig {
@Autowired
ConnectorService connectorService;
@Autowired
MasterService masterService;
private HashMap<ConnectorPK, GatewayAqr> connectorMap;
@Bean
@Scope(value = "prototype")
public AbstractClientConnectionFactory clientCF(Connector connector , Master master) {
TcpNetClientConnectionFactory clientConnectionFactory = new TcpNetClientConnectionFactory(connector.getAqrIpAddr(), connector.getAqrIpPortNo());
clientConnectionFactory.setSingleUse(false);
MyByteArraySerializer obj = new MyByteArraySerializer(master.getAqrMsgHeaderLength(), master.getAqrId());
clientConnectionFactory.setSerializer(obj);
clientConnectionFactory.setDeserializer(obj);
clientConnectionFactory.setSoKeepAlive(true);
TcpMessageMapper tcpMessageMapper = new TcpMessageMapper();
tcpMessageMapper.setCharset("ISO-8859-1");
clientConnectionFactory.setMapper(tcpMessageMapper);
clientConnectionFactory.setBeanName(connector.getAqrIpAddr() + ":" + connector.getAqrIpPortNo());
clientConnectionFactory.afterPropertiesSet();
clientConnectionFactory.start();
return clientConnectionFactory;
}
@Bean
@Scope(value = "prototype")
public TcpSendingMessageHandler tcpOutGateway(AbstractClientConnectionFactory connectionFactory) {
TcpSendingMessageHandler messageHandler = new TcpSendingMessageHandler();
messageHandler.setConnectionFactory(connectionFactory);
messageHandler.setClientMode(true);
messageHandler.setTaskScheduler(getTaskScheduler());
messageHandler.setStatsEnabled(true);
messageHandler.afterPropertiesSet();
messageHandler.start();
return messageHandler;
}
@Bean
@Scope(value = "prototype")
public TcpReceivingChannelAdapter tcpInGateway(AbstractClientConnectionFactory connectionFactory) {
TcpReceivingChannelAdapter messageHandler = new TcpReceivingChannelAdapter();
messageHandler.setConnectionFactory(connectionFactory);
messageHandler.setClientMode(true);
messageHandler.setOutputChannel(receive());
messageHandler.setAutoStartup(true);
messageHandler.setTaskScheduler(getTaskScheduler());
messageHandler.afterPropertiesSet();
messageHandler.start();
return messageHandler;
}
@Bean
@Scope(value = "prototype")
public TaskScheduler getTaskScheduler() {
TaskScheduler ts = new ThreadPoolTaskScheduler();
return ts;
}
@Bean
public MessageChannel receive() {
QueueChannel channel = new QueueChannel();
return channel;
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return new PollerMetadata();
}
@Bean
@Transactional
public HashMap<ConnectorPK, GatewayAqr> gatewayAqr() throws Exception {
connectorMap = new HashMap();
Connector connector = null;
ConnectorPK connectorPK = null;
Master master = null;
TcpConnectionSupport connectionSupport = null;
// 1. Get List of Connections configured in Database
List<Connector> connectors = connectorService.getConnections();
if (connectors.size() > 0) {
for (int i = 0; i < connectors.size(); i++) {
// 2. Get the connection details
connector = connectors.get(i);
connectorPK = aqrConnector.getConnectorpk();
master = masterService.findById(connectorPK.getAcuirerId());
try {
// 3. Create object of TcpNetClientConnectionFactory for each Acquirer connection
AbstractClientConnectionFactory clientConnectionFactory = clientCF(aqrConnector, aqrMaster);
// 4. Create TcpSendingMessageHandler for the Connection
TcpSendingMessageHandler outHandler = tcpOutGateway(clientConnectionFactory);
// 5. Create TcpReceivingChannelAdapter object for the Connection and assign it to receive channel
TcpReceivingChannelAdapter inHandler = tcpInGateway(clientConnectionFactory);
// 6. Generate the GatewayAqr object
GatewayAqr gatewayAqr = new GatewayAqr(clientConnectionFactory, outHandler, inHandler);
// 7. Put in the MAP acuirerPK and Send MessageHandler object
connectorMap.put(aqrConnectorPK, gatewayAquirer);
} catch (Exception e) {
}
} // for
} // if
return connectorMap;
}
}
*********************************************************************************************************************************
@EnableIntegration
@IntegrationComponentScan(basePackageClasses = {GatewayEventConfig.class,GatewayAqrConfig.class })
@Configuration
@ComponentScan(basePackages = {"com.iz.zw.gateway.impl", "com.iz.zw.configuration"})
@Import({GatewayEventConfig.class,GatewayAquirerConfig.class})
public class GatewayConfig {
@Autowired
private GatewayAsyncReply<Object, Message<?>> gatewayAsyncReply;
@Autowired
private GatewayCorrelationStrategy gatewayCorrelationStrategy;
@Autowired
private HashMap<ConnectorPK, GatewayAqr> gatewayAqrs;
@Autowired
ConnectorService connectorService;
@Autowired
GatewayResponseDeserializer gatewayResponseDeserializer;
@MessagingGateway(defaultRequestChannel = "send")
public interface Gateway {
void waitForResponse(TransactionMessage transaction);
}
@Bean
public MessageChannel send() {
DirectChannel channel = new DirectChannel();
return channel;
}
@Bean
@ServiceActivator(inputChannel = "send")
public BarrierMessageHandlerWithLateGoodResponse barrier() {
BarrierMessageHandlerWithLateGoodResponse barrier = new BarrierMessageHandlerWithLateGoodResponse(25000, this.gatewayCorrelationStrategy);
barrier.setAsync(true);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateGoodresponseChannel());
return barrier;
}
@ServiceActivator(inputChannel = "out")
public void printMessage(Message<?> message) {
System.out.println("in out channel");
}
@Transformer(inputChannel = "receive", outputChannel = "process")
public TransactionMessage convert(byte[] response) {
logger.debug("Response Received", Arrays.toString(response));
TransactionMessage transactionMessage = gatewayResponseDeserializer.deserializeResponse(response);
System.out.println("Response : " + response);
return transactionMessage;
}
@ServiceActivator(inputChannel = "process")
@Bean
public MessageHandler releaser() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
try {
gatewayAsyncReply.put(message);
barrier().trigger(message);
} catch (GatewayLateGoodMessageException exception) {
System.out.println("Late good response..!");
gatewayAsyncReply.get(message);
lateGoodresponseChannel().send(message);
}
}
};
}
@Bean
public MessageChannel process() {
QueueChannel channel = new QueueChannel();
return channel;
}
@Bean
public MessageChannel out() {
DirectChannel channel = new DirectChannel();
return channel;
}
@Bean
public MessageChannel lateGoodresponseChannel() {
QueueChannel channel = new QueueChannel();
return channel;
}
@ServiceActivator(inputChannel="lateGoodresponseChannel")
public void handleLateGoodResponse(Message<?> message) {
String strSTAN = null;
String strResponse = null;
Message<?> respMessage = null;
if(message instanceof TransactionMessage){
strSTAN = ((TransactionMessage)message).getStan();
respMessage = gatewayAsyncReply.get(strSTAN);
if (null != respMessage) {
strResponse = (String) message.getPayload();
}
}
logger.info("Late Good Response: " + strResponse);
}
}
*********************************************************************************************************************************
@Configuration
public class GatewayEventConfig {
private static final Logger logger = LoggerFactory.getLogger(GatewayEventConfig.class);
@Bean
public ApplicationEventListeningMessageProducer tcpEventListener() {
ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(new Class[] {TcpConnectionOpenEvent.class, TcpConnectionCloseEvent.class, TcpConnectionExceptionEvent.class});
producer.setOutputChannel(tcpEventChannel());
producer.setAutoStartup(true);
producer.setTaskScheduler(getEventTaskScheduler());
producer.start();
return producer;
}
@Bean
public TaskScheduler getEventTaskScheduler() {
TaskScheduler ts = new ThreadPoolTaskScheduler();
return ts;
}
@Bean
public MessageChannel tcpEventChannel() {
return new QueueChannel();
}
@Transactional
@ServiceActivator(inputChannel = "tcpEventChannel")
public void tcpConnectionEvent(TcpConnectionEvent event) {
System.out.println("In publishing" + event.toString());
String strConnectionFactory = event.getConnectionFactoryName();
if (strConnectionFactory.equals("connection1")) {
//send some message to connector
} else {
// send message to another connector
}
}
}
this is my configuration files, my application tries to connect to 2 servers as soon as it starts. I have made 2 configurations for 2 servers as above class GatewayAqrConfig1 and GatewayConfig1 classes are used for first server connection GatewayAqrConfig2 and GatewayConfig2 classes are used for second server connection Using event I am connecting to server and sending a connection set up message, if server is already started and If I have started my application, it gets the event, connects and sends the message but I am not getting the response instead I am getting the WARNING as below
**WARN TcpNetConnection:186 - Unexpected message - no endpoint registered with connection interceptor:**
i.e connection does not registers the listener properly
but if I am starting my application first and then servers I am getting responses perfectly, As I am connecting to servers I could not restart it ? My application should connect to server which is already started ? what could be the problem ?
Version used:
Spring integration Version : 4.3.1
Spring version : 4.3.2
JDK 1.8 on JBOSS EAP 7
That WARN message means that, somehow, an inbound message was received without a TcpReceivingChannelAdapter
having been registered with the connection factory. Client mode should make no difference.
Having looked at your code a little more, the prototype beans should be ok, as long as you use those objects (especially TcpMessageHandler
directly rather than via the framework).
It's not obvious to me how that can happen, given your configuration; the listener is registered when you call setConnectionFactory
on the receiving adapter.
If you can reproduce it with a trimmed-down project and post it someplace, I will take a look.