Search code examples
javajmsapache-kafka-connectweblogic12c

Getting error while implementing DestinationAvailabilityListener in java


I have a usecase where i need to listen for available members of a uniform ditributed queue hosted in weblogic 12c. I read and found that DestinationAvailabilityListener interface have methods which might suit the need. Following is my code:

import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import javax.naming.Context;

import org.apache.kafka.connect.connector.ConnectorContext;

import weblogic.jms.extensions.DestinationAvailabilityListener;
import weblogic.jms.extensions.DestinationDetail;
import weblogic.jms.extensions.JMSDestinationAvailabilityHelper;
import weblogic.jms.extensions.RegistrationHandle;

public class QueueMonitor extends Thread implements DestinationAvailabilityListener,WebLogicJmsTask {
        private Hashtable<String, String> wlsEnvParamHashTbl = null;
        private final Object containerLock = new Object();
        private final  CountDownLatch startLatch ;
        private RegistrationHandle registrationHandle;
        private ArrayList<String> containerMap;
        boolean shutdown =false,changeflg=false;
        private final ConnectorContext context;
        
    public QueueMonitor(Map<String, String> props,ConnectorContext context) {
            super();
            wlsEnvParamHashTbl = new Hashtable<String, String>();
            wlsEnvParamHashTbl.put(Context.PROVIDER_URL, props.get(WEBLOGIC_T3_URL_DESTINATION_CONFIG)); // set Weblogic JMS URL
            wlsEnvParamHashTbl.put(Context.INITIAL_CONTEXT_FACTORY, "weblogic.jndi.WLInitialContextFactory"); // set Weblogic JNDI
            wlsEnvParamHashTbl.put(Context.SECURITY_PRINCIPAL, props.get(WEBLOGIC_USERNAME_CONFIG)); // set Weblogic UserName
            wlsEnvParamHashTbl.put(Context.SECURITY_CREDENTIALS, props.get(WEBLOGIC_PASSWORD_CONFIG)); // set Weblogic PassWord
            for (Map.Entry<String,String> entry : wlsEnvParamHashTbl.entrySet())  
                System.out.println("Key = " + entry.getKey() + 
                                 ", Value = " + entry.getValue()); 
        System.out.println(props.get(WEBLOGIC_JMS_DESTINATION_CONFIG));
            this.context=context;
            this.startLatch = new CountDownLatch(1);
            JMSDestinationAvailabilityHelper dah = JMSDestinationAvailabilityHelper.getInstance();
            try {this.registrationHandle= dah.register(wlsEnvParamHashTbl, props.get(WEBLOGIC_JMS_DESTINATION_CONFIG), this);
            
                startLatch.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            catch(Exception e)
            {
                System.out.println(e);
            }
        }

    public void run()
    {
        while(!shutdown)
        {
            if(changeflg)
            {
                context.requestTaskReconfiguration();
                changeflg=false;
            }
            
        }
        if(shutdown)
        {
            System.out.println("Called shutdown, returning");
            return;
        }
        
    }
    
    public synchronized String getContainers()
    {
        String list=null;
        synchronized (containerLock) {
            list=String.join(",", containerMap);
        }
        return list;
    }

      @Override
         public void onDestinationsAvailable(String destJNDIName, List<DestinationDetail> physicalAvailableMembers) {
            synchronized (containerLock) {
                System.out.println("destJNDIName is :"+destJNDIName);
              // For all Physical destinations, start a container
              for (DestinationDetail detail : physicalAvailableMembers) {
                  System.out.println("member is :"+detail.getJNDIName());
                  containerMap.add(detail.getJNDIName());
                //containerMap.put(detail.getJNDIName(), detail.getJNDIName());
              }
              if(startLatch.getCount()==0)
              {
                  changeflg=true;
              }
            }
            startLatch.countDown();       
          }

        @Override
        public void onDestinationsUnavailable(String destJNDIName, List<DestinationDetail> physicalUnavailableMembers) {
            // TODO Auto-generated method stub
            synchronized (containerLock) {
                  // Shutdown all containers whose physical members are no longer available
                  for (DestinationDetail detail : physicalUnavailableMembers) {
                    containerMap.remove(detail.getJNDIName());
                    // maybe i will need to do somethinh here
                  }
                  changeflg=true;
                }
            
        }

        @Override
          public void onFailure(String destJndiName, Exception exception) {
            // Looks like a cluster wide failure
             System.out.println("inside on failure");
            shutdown();
            System.out.println(exception);
            
          }
        public void shutdown() {
            // Unregister for events about destination availability
              registrationHandle.unregister();

            // Shut down containers
            synchronized (containerLock) {
                containerMap.removeAll(containerMap);
            }
            
            shutdown=true;
          }
}

On running its going to onFailure with the following error:

<Error> <Kernel> <WL-000802> <ExecuteRequest failed
 java.lang.NullPointerException.
        at com.bt.connect.QueueMonitor.shutdown(QueueMonitor.java:128)
        at com.bt.connect.QueueMonitor.onFailure(QueueMonitor.java:118)
        at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper$3.run(JMSDestinationAvailabilityHelper.java:490)
        at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.callOutListener(JMSDestinationAvailabilityHelper.java:451)
        at weblogic.jms.extensions.JMSDestinationAvailabilityHelper$DestinationAvailabilityListenerWrapper.onFailure(JMSDestinationAvailabilityHelper.java:487)
        at weblogic.jms.common.CDS$DD2Listener.reportException(CDS.java:1145)
        at weblogic.jms.common.CDS.lookupDDAndCalloutListenerSingle(CDS.java:459)
        at weblogic.jms.common.CDS.lookupDDAndCalloutListener(CDS.java:412)
        at weblogic.jms.common.CDS.access$400(CDS.java:52)
        at weblogic.jms.common.CDS$DDListenerRegistrationTimerListener.timerExpired(CDS.java:255)
        at weblogic.timers.internal.TimerImpl.run(TimerImpl.java:301)
        at weblogic.work.ExecuteRequestAdapter.execute(ExecuteRequestAdapter.java:21)
        at weblogic.kernel.ExecuteThread.execute(ExecuteThread.java:147)
        at weblogic.kernel.ExecuteThread.run(ExecuteThread.java:119)

i am using java 8. Can anyone please help me ? Thanks in advance.


Solution

  • i had resolved the issues in the code earlier , sharing the same for others who might need help , the primary problem was wlthint3client.jar was needed. The final code git is below.

    MyJMSKafkaConnect

    The code is kafkaconnect api which reads from Weblogic Uniform distributed Queue and writes to kafka queue.