Search code examples
javac++boostjava-native-interfaceboost-thread

How can I make JNI RegisterNatives callack Java functions have C++ instance scope?


I have a C++ application that triggers an Akka Actor-based MapReduce system. This I do via a C++ JNI wrapper class MapReduceBridge and it works fine taking into account that the Akka Actor and the C++ application threads need to synchronize (shown below). The problem is that with this design I'm forced to have my C++ MapReduceBridge class be a singleton and use static variables because the callback methods registered via RegisterNatives are global. My question is what's the best way to move into an class instance scope so multiple instances of this C++ MapReduceBridge can coexist?

The two Java to C++ callbacks accomplish the following:

  • the C++ thread has to wait for the first callback until the Akka-Actor has deployed/installed in all machines the jar containing the MapExecutor and ReduceExecutor.
  • the C++ thread has to wait for the second callback until the Akka-Actor completes running the MapReduce so it can access the results.

I use here two callbacks (function callback from Java to C++) using RegisterNatives, this is the C++ side:

// allow the C++ MapReduceBridge's application thread to wait for the MapReduce system
static boost::mutex mutex_;
static boost::mutex::scoped_lock lock_(mutex_);
static boost::condition_variable callback_condition_;

// callback notification that installation has completed    
JNIEXPORT jobject JNICALL com_sfoam_hpcmom_mapreduce_bridge_BridgeClient_startupCompletedCallback(JNIEnv* env, jobject obj) {
    log_info << "callback 'startup completed' received";
    callback_condition_.notify_one();
}

// callback notification that MapReduce run has completed   
JNIEXPORT jobject JNICALL com_sfoam_hpcmom_mapreduce_bridge_BridgeClient_runCompletedCallback(JNIEnv* env, jobject obj) {
    log_info << "callback 'run completed' received";
    callback_condition_.notify_one();
}

static const JNINativeMethod kCallbackMethods[] = {
      { "startupCompletedCallback", "()V", (void*)&com_sfoam_hpcmom_mapreduce_bridge_BridgeClient_startupCompletedCallback },
      { "runCompletedCallback"    , "()V", (void*)&com_sfoam_hpcmom_mapreduce_bridge_BridgeClient_runCompletedCallback     }
};

MapReduceBridge::MapReduceBridge(std::string env_jar_path) {
    // ... 
    // snippet where the callbacks are registered
    const int methods_size = sizeof(kCallbackMethods) / sizeof(kCallbackMethods[0]);
    env_->RegisterNatives(bridge_class_, kCallbackMethods, methods_size);
    // ... 
}

void MapReduceBridge::run() {
    // get BridgeClient's method run and invoke it
    env_->CallObjectMethod(bridge_instance_, bridge_run_);
    log_info << "run method launched";

    // wait for the callback that 'run' has completed
    callback_condition_.wait(lock_);
}

And on the Java side:

//---------------------------------------------------------------
/**
 * {@inheritDoc}
 */
@Override
public void onReceiveResult(double[] results) {
    this.results = results;
    runCompletedCallback();
}

Following this example my goal is to have the synchronization variables mutex_, lock_ and callback_condition_ not static but a member of my MapReduceBridge C++ class and therefore be able to have multiple instances of MapReduceBridge i.e. the callbacks should execute not statically but in the scope of my current MapReduceBridge instance.

e.g. one possible solution would be to save in Java a pointer to the C++ this instance and pass that pointer during the callbacks. How can I do that in JNI?


Solution

  • You can store a c/c++ object's address(i.e. pointer) in your Java class as a long. When you return the long to your JNI native layer( as a jlong ), just cast it back to your object pointer.

    C++

    // callback notification that MapReduce run has completed   
    JNIEXPORT jobject JNICALL com_sfoam_hpcmom_mapreduce_bridge_BridgeClient_runCompletedCallback(JNIEnv* env, jobject obj, jlong ptr) {
        log_info << "callback 'run completed' received";
        MapReduceBridge * ptr = (MapReduceBridge *)ptr;
        //Use ptr now to reference your instance variables
        //callback_condition_.notify_one();
    }
    void MapReduceBridge::run() {
        // get BridgeClient's method run and invoke it
        env_->CallObjectMethod(bridge_instance_, bridge_run_, (long)this );
        log_info << "run method launched";
    
        // wait for the callback that 'run' has completed
        callback_condition_.wait(lock_);
    }
    

    Java

    public void run(long ptr)
    {
        this.ptrToMapReduceBridge = ptr;
        //do stuff
    }
    @Override
    public void onReceiveResult(double[] results) {
       this.results = results;
       runCompletedCallback(this.ptrToMapReduceBridge);
    

    }