Search code examples
groovyapache-nificloudera

Nifi execute script extrenal library not working


I have written a jar which has jedis connection pool feature, by using which I have written the groovy script in nifi for redis location search. But it is behaving stangely, sometimes it is working and sometimes not.

Redis.java

public class Redis {
    private static Object staticLock = new Object();
    private static JedisPool pool;
    private static String host;
    private static int port; 
    private static int connectTimeout; 
    private static int operationTimeout; 
    private static String password;
    private static JedisPoolConfig config;

    public static void initializeSettings(String host, int port, String password, int connectTimeout, int operationTimeout) {
        Redis.host = host;
        Redis.port = port;
        Redis.password = password;
        Redis.connectTimeout = connectTimeout;
        Redis.operationTimeout = operationTimeout;
    }
    
    
   

    public static JedisPool getPoolInstance() {
        
        if (pool == null) { // avoid synchronization lock if initialization has already happened
            synchronized(staticLock) {
                if (pool == null) { // don't re-initialize if another thread beat us to it.
                    JedisPoolConfig poolConfig = getPoolConfig();
                    boolean useSsl = port == 6380 ? true : false;
                    int db = 0;
                    String clientName = "MyClientName"; // null means use default
                    SSLSocketFactory sslSocketFactory = null; // null means use default
                    SSLParameters sslParameters = null; // null means use default
                    HostnameVerifier hostnameVerifier = new SimpleHostNameVerifier(host);
                    pool = new JedisPool(poolConfig, host, port);
                            
                            //(poolConfig, host, port, connectTimeout,operationTimeout,password, db,
//                            clientName, useSsl, sslSocketFactory, sslParameters, hostnameVerifier);
                }
            }
        }
        return pool;
    }

    public static JedisPoolConfig getPoolConfig() {
        if (config == null) {
            JedisPoolConfig poolConfig = new JedisPoolConfig();
            int maxConnections = 200;
            poolConfig.setMaxTotal(maxConnections);
            poolConfig.setMaxIdle(maxConnections);
            poolConfig.setBlockWhenExhausted(true);
            poolConfig.setMaxWaitMillis(operationTimeout);
            poolConfig.setMinIdle(50);
            Redis.config = poolConfig;
        }

        return config;
    }

    public static String getPoolCurrentUsage()
    {
        JedisPool jedisPool = getPoolInstance();
        JedisPoolConfig poolConfig = getPoolConfig();

        int active = jedisPool.getNumActive();
        int idle = jedisPool.getNumIdle();
        int total = active + idle;
        String log = String.format(
                "JedisPool: Active=%d, Idle=%d, Waiters=%d, total=%d, maxTotal=%d, minIdle=%d, maxIdle=%d",
                active,
                idle,
                jedisPool.getNumWaiters(),
                total,
                poolConfig.getMaxTotal(),
                poolConfig.getMinIdle(),
                poolConfig.getMaxIdle()
        );

        return log;
    }

    private static class SimpleHostNameVerifier implements HostnameVerifier {

        private String exactCN;
        private String wildCardCN;
        public SimpleHostNameVerifier(String cacheHostname)
        {
            exactCN = "CN=" + cacheHostname;
            wildCardCN = "CN=*" + cacheHostname.substring(cacheHostname.indexOf('.'));
        }

        public boolean verify(String s, SSLSession sslSession) {
            try {
                String cn = sslSession.getPeerPrincipal().getName();
                return cn.equalsIgnoreCase(wildCardCN) || cn.equalsIgnoreCase(exactCN);
            } catch (SSLPeerUnverifiedException ex) {
                return false;
            }
        }
    }
}

CustomFunction:

public class Functions {

    SecureRandom rand = new SecureRandom(); 
    private static final String UTF8= "UTF-8";


    public static JedisPool jedisPool=null;

public static String searchPlace(double lattitude,double longitude) {

try(Jedis jedis = jedisPool.getResource()) {
}
catch(Exception e){
log.error('execption',e);
}
}

}

Groovyscript:

    import org.apache.nifi.processor.ProcessContext;
    import com.customlib.functions.*;
    
    def flowFile = session.get();
    if (flowFile == null) {
        return;
    }
    def flowFiles = [] as List<FlowFile>
    def failflowFiles = [] as List<FlowFile>
    def input=null;
    def data=null;
    
    
    
     static onStart(ProcessContext context){
        Redis.initializeSettings("host", 6379, null,0,0);
         Functions.jedisPool= Redis.getPoolInstance();
        
      }
    
      static onStop(ProcessContext context){
       Functions.jedisPool.destroy();
      }
    
          try{
    log.warn('is jedispool connected::::'+Functions.jedisPool.isClosed());
            def inputStream = session.read(flowFile)
            def writer = new StringWriter();
            IOUtils.copy(inputStream, writer, "UTF-8");
            data=writer.toString();
            input = new JsonSlurper().parseText( data );
    log.warn('place is::::'+Functions.getLocationByLatLong(input["data"]["lat"],  input["data"]["longi"]);
             .......
             ...........
          }
          catch(Exception e){
        }
        newFlowFile = session.write(newFlowFile, { outputStream -> 
                         outputStream.write( data.getBytes(StandardCharsets.UTF_8) )
                } as OutputStreamCallback)
                failflowFiles<< newFlowFile;
                    }
        
    session.transfer(flowFiles, REL_SUCCESS)
    session.transfer(failflowFiles, REL_FAILURE)
    session.remove(flowFile)

The nifi is in 3 node cluster. The function lib is configured in groovyscript module directory.In the above groovy script processor, the log statement is jedispool connected:::: is sometimes printing false,sometimes true but after deploying for the first time jar every time works. But later it is unpredictable, I am not getting what is wrong in the code. How the groovyscript will load the jar. How can I acheive the lib based search using groovy script.


Solution

  • Redis.pool never gets null after initialization. You are calling pool.destroy() but not setting it to null.

    getPoolInstance() checks if pool is null only then it creates a new pool.

    I don't see any reason to have 2 variables to hold reference to the same pool: in Redis and in Functions class.