I have a usecase where i need to listen for available members of a uniform ditributed queue hosted in weblogic 12c. I read and found that DestinationAvailabilityListener interface have methods which might suit the need. Following is my code:
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import javax.naming.Context;
import org.apache.kafka.connect.connector.ConnectorContext;
import weblogic.jms.extensions.DestinationAvailabilityListener;
import weblogic.jms.extensions.DestinationDetail;
import weblogic.jms.extensions.JMSDestinationAvailabilityHelper;
import weblogic.jms.extensions.RegistrationHandle;
public class QueueMonitor extends Thread implements DestinationAvailabilityListener,WebLogicJmsTask {
private Hashtable<String, String> wlsEnvParamHashTbl = null;
private final Object containerLock = new Object();
private final CountDownLatch startLatch ;
private RegistrationHandle registrationHandle;
private ArrayList<String> containerMap;
boolean shutdown =false,changeflg=false;
private final ConnectorContext context;
public QueueMonitor(Map<String, String> props,ConnectorContext context) {
super();
wlsEnvParamHashTbl = new Hashtable<String, String>();
wlsEnvParamHashTbl.put(Context.PROVIDER_URL, props.get(WEBLOGIC_T3_URL_DESTINATION_CONFIG)); // set Weblogic JMS URL
wlsEnvParamHashTbl.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory"); // set Weblogic JNDI
wlsEnvParamHashTbl.put(Context.SECURITY_PRINCIPAL, props.get(WEBLOGIC_USERNAME_CONFIG)); // set Weblogic UserName
wlsEnvParamHashTbl.put(Context.SECURITY_CREDENTIALS, props.get(WEBLOGIC_PASSWORD_CONFIG)); // set Weblogic PassWord
for (Map.Entry<String,String> entry : wlsEnvParamHashTbl.entrySet())
System.out.println("Key = " + entry.getKey() +
", Value = " + entry.getValue());
System.out.println(props.get(WEBLOGIC_JMS_DESTINATION_CONFIG));
this.context=context;
this.startLatch = new CountDownLatch(1);
JMSDestinationAvailabilityHelper dah = JMSDestinationAvailabilityHelper.getInstance();
try {this.registrationHandle= dah.register(wlsEnvParamHashTbl, props.get(WEBLOGIC_JMS_DESTINATION_CONFIG), this);
startLatch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
catch(Exception e)
{
System.out.println(e);
}
}
public void run()
{
while(!shutdown)
{
if(changeflg)
{
context.requestTaskReconfiguration();
changeflg=false;
}
}
if(shutdown)
{
System.out.println("Called shutdown, returning");
return;
}
}
public synchronized String getContainers()
{
String list=null;
synchronized (containerLock) {
list=String.join(",", containerMap);
}
return list;
}
@Override
public void onDestinationsAvailable(String destJNDIName, List<DestinationDetail> physicalAvailableMembers) {
synchronized (containerLock) {
System.out.println("destJNDIName is :"+destJNDIName);
// For all Physical destinations, start a container
for (DestinationDetail detail : physicalAvailableMembers) {
System.out.println("member is :"+detail.getJNDIName());
containerMap.add(detail.getJNDIName());
//containerMap.put(detail.getJNDIName(), detail.getJNDIName());
}
if(startLatch.getCount()==0)
{
changeflg=true;
}
}
startLatch.countDown();
}
@Override
public void onDestinationsUnavailable(String destJNDIName, List<DestinationDetail> physicalUnavailableMembers) {
// TODO Auto-generated method stub
synchronized (containerLock) {
// Shutdown all containers whose physical members are no longer available
for (DestinationDetail detail : physicalUnavailableMembers) {
containerMap.remove(detail.getJNDIName());
// maybe i will need to do somethinh here
}
changeflg=true;
}
}
@Override
public void onFailure(String destJndiName, Exception exception) {
// Looks like a cluster wide failure
System.out.println("inside on failure");
shutdown();
System.out.println(exception);
}
public void shutdown() {
// Unregister for events about destination availability
registrationHandle.unregister();
// Shut down containers
synchronized (containerLock) {
containerMap.removeAll(containerMap);
}
shutdown=true;
}
}
On running its going to onFailure with the following error:
<Error> <Kernel> <WL-000802> <ExecuteRequest failed
java.lang.NullPointerException.
at com.bt.connect.QueueMonitor.shutdown(QueueMonitor.java:128)
at com.bt.connect.QueueMonitor.onFailure(QueueMonitor.java:118)
at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper$3.run(JMSDestinationAvailabilityHelper.java:490)
at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.callOutListener(JMSDestinationAvailabilityHelper.java:451)
at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.onFailure(JMSDestinationAvailabilityHelper.java:487)
at weblogic.jms.common.CDS$DD2Listener.reportException(CDS.java:1145)
at weblogic.jms.common.CDS.lookupDDAndCalloutListenerSingle(CDS.java:459)
at weblogic.jms.common.CDS.lookupDDAndCalloutListener(CDS.java:412)
at weblogic.jms.common.CDS.access$400(CDS.java:52)
at weblogic.jms.common.CDS$DDListenerRegistrationTimerListener.timerExpired(CDS.java:255)
at weblogic.timers.internal.TimerImpl.run(TimerImpl.java:301)
at weblogic.work.ExecuteRequestAdapter.execute(ExecuteRequestAdapter.java:21)
at weblogic.kernel.ExecuteThread.execute(ExecuteThread.java:147)
at weblogic.kernel.ExecuteThread.run(ExecuteThread.java:119)
i am using java 8. Can anyone please help me ? Thanks in advance.
i had resolved the issues in the code earlier , sharing the same for others who might need help , the primary problem was wlthint3client.jar was needed. The final code git is below.
The code is kafkaconnect api which reads from Weblogic Uniform distributed Queue and writes to kafka queue.