Search code examples
rustjava-native-interfaceborrowing

How to use read-only borrowed Rust data by multiple Java threads?


I have a struct Foo and FooRef which has references to data from Foo:

struct Foo { /* ... */ }

struct FooRef<'foo> { /* ... */ }

impl Foo {
    pub fn create_ref<'a>(&'a self) -> FooRef<'a> { /* ... */ }
}

Now Foo directly cannot be used in the logic; I need FooRef. Creating FooRef requires lots of computation, so I do it once just after creating the Foo instance. FooRef is immutable; it's only used for reading data.

Multiple threads needs to access this FooRef instance. How can I implement this? The calling threads are Java threads and this will be used with JNI. This prevents using a scoped threadpool, for example.

Another complication is that when I have to refresh the Foo instance to load new data into it. I then also need to recreate the FooRef instance as well.

How can this be achieved thread-safely and memory-safely? I tried messing around with pointers and RwLock but that resulted in a memory leak (the memory usage kept on adding on each reload). I am a Java developer that is a newbie to pointers.

The data in Foo is mostly text and about 250Mb. The FooRef is mostly strs and structs of strs borrowed from Foo.

My Java usage explanation

I use two long variables in a Java class to store pointers to Foo and FooRef. I use a static ReentrantReadWriteLock to guard these pointers.

If the data need to be updated in Foo, I acquire a write lock, drop FooRef, update Foo, create a new FooRef and update the ref pointer in Java.

If I need to read the data (i.e. when I am not updating Foo), I acquire a read lock and use the FooRef.

The memory leak is visible only when multiple Java threads are calling this code.

Rust:

use jni::objects::{JClass, JString};
use jni::sys::{jlong, jstring};
use jni::JNIEnv;

use std::collections::HashMap;

macro_rules! foo_mut_ptr {
    ($env: expr, $class: expr) => {
        $env.get_field(*$class, "ptr", "J")
            .ok()
            .and_then(|j| j.j().ok())
            .and_then(|ptr| {
                if ptr == 0 {
                    None
                } else {
                    Some(ptr as *mut Foo)
                }
            })
    };
}

macro_rules! foo_ref_mut_ptr {
    ($env: expr, $class: expr) => {
        $env.get_field(*$class, "ptrRef", "J")
            .ok()
            .and_then(|j| j.j().ok())
            .and_then(|ptr| {
                if ptr == 0 {
                    None
                } else {
                    Some(ptr as *mut FooRef)
                }
            })
    };
}

macro_rules! foo_mut {
    ($env: expr, $class: expr) => {
        foo_mut_ptr!($env, $class).map(|ptr| &mut *ptr)
    };
}

macro_rules! foo_ref {
    ($env: expr, $class: expr) => {
        foo_ref_mut_ptr!($env, $class).map(|ptr| &*ptr)
    };
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_create(_env: JNIEnv, _class: JClass) -> jlong {
    Box::into_raw(Box::new(Foo::default())) as jlong
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_createRef(env: JNIEnv, class: JClass) -> jlong {
    let foo = foo_mut!(env, class).expect("createRef was called on uninitialized Data");
    let foo_ref = foo.create_ref();
    Box::into_raw(Box::new(foo_ref)) as jlong
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_reload(env: JNIEnv, class: JClass) {
    let foo = foo_mut!(env, class).expect("foo must be initialized");
    *foo = Foo {
        data: vec!["hello".to_owned(); 1024 * 1024],
    };
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_destroy(env: JNIEnv, class: JClass) {
    drop_ptr(foo_ref_mut_ptr!(env, class));
    drop_ptr(foo_mut_ptr!(env, class));
}

#[allow(non_snake_case)]
#[no_mangle]
pub unsafe extern "system" fn Java_test_App_destroyRef(env: JNIEnv, class: JClass) {
    drop_ptr(foo_ref_mut_ptr!(env, class));
}

unsafe fn drop_ptr<T>(ptr: Option<*mut T>) {
    if let Some(ptr) = ptr {
        let _foo = Box::from_raw(ptr);
        // foo drops here
    }
}

#[derive(Default)]
struct Foo {
    data: Vec<String>,
}

#[derive(Default)]
struct FooRef<'a> {
    data: HashMap<&'a str, Vec<&'a str>>,
}

impl Foo {
    fn create_ref(&self) -> FooRef {
        let mut data = HashMap::new();
        for s in &self.data {
            let s = &s[..];
            data.insert(s, vec![s]);
        }
        FooRef { data }
    }
}

Java:

package test;

import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;

public class App implements AutoCloseable {
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final ReadLock readLock = lock.readLock();
    private final WriteLock writeLock = lock.writeLock();

    private volatile long ptr;
    private volatile long ptrRef;
    private volatile boolean reload;

    static {
        System.loadLibrary("foo");
    }

    public static void main(String[] args) throws InterruptedException {
        try (App app = new App()) {
            for (int i = 0; i < 20; i++) {
                new Thread(() -> {
                    while (true) {
                        app.tryReload();
                    }
                }).start();
            }

            while (true) {
                app.setReload();
            }
        }
    }

    public App() {
        this.ptr = this.create();
    }

    public void setReload() {
        writeLock.lock();
        try {
            reload = true;
        } finally {
            writeLock.unlock();
        }
    }

    public void tryReload() {
        readLock.lock();
        debug("Got read lock");

        if (reload) {
            debug("Cache is expired");

            readLock.unlock();
            debug("Released read lock coz expired");

            writeLock.lock();
            debug("Got write lock");

            try {
                if (reload) {
                    fullReload();
                }

                readLock.lock();
                debug("Got read lock inside write");
            } finally {
                writeLock.unlock();
                debug("Released write lock");
            }
        }

        readLock.unlock();
        debug("Released read lock");
    }

    private void fullReload() {
        destroyRef();
        debug("Dropped ref");

        debug("Reloading");
        reload();
        debug("Reloading completed");

        updateRef();
        debug("Created ref");
        reload = false;
    }

    private void updateRef() {
        this.ptrRef = this.createRef();
    }

    private native void reload();

    private native long create();

    private native long createRef();

    private native void destroy();

    private native void destroyRef();

    @Override
    public void close() {
        writeLock.lock();
        try {
            this.destroy();
            this.ptrRef = 0;
            this.ptr = 0;
        } finally {
            writeLock.unlock();
        }
    }

    private static void debug(String s) {
        System.out.printf("%10s : %s%n", Thread.currentThread().getName(), s);
    }

}

Solution

  • The problem that I was thinking as memory leak wasn't actually a memory leak. The issue was that the allocator was using thread local arenas. So, whatever thread was reloading 250MB of data was leaving the allocated space as is and not returning it to the system. This issue was not specific to JNI, but also happening in pure safe rust code. See Why multiple threads using too much memory when holding Mutex

    The default number of arenas created defaults to 8 * cpu count = 64 in my case. This setting can be overridden by setting MALLOC_ARENA_MAX env variable.

    So I resolved this issue by setting MALLOC_ARENA_MAX env variable to 1 . So, the approach I took is fine. It was just platform specific issue.

    This issue was occurring only in Ubuntu in WSL. I also tried the same code without any tweaking on Windows 10 and it works perfectly without any issues.