Search code examples
javaandroidmultithreadingrx-java2

How does the clear (CompositeDisposable) method of RxJava work internally?


Anonymous class holds a reference to the enclosing class.

In the following example, I created a small Activity. In the onCreate method, I just add a timer on another Thread, add a CompositeDisposable and clear it in the onDestroy.

Obviously without the CompositeDisposable, it will create a memory leak. With the CompositeDisposable it doesn't create any memory leak, but how is it even working?

Does RxJava just interrupt the Thread and put null on every callback? Can you provide some line that does this work in RxJava source code? I suppose it’s somewhere near the dispose method.

public class MainActivity extends AppCompatActivity {

private String TAG = "MainActivity";

private CompositeDisposable composite = new CompositeDisposable();

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);

    composite.add(Flowable
            .just(1)
            .timer(90, TimeUnit.SECONDS)
            .subscribeOn(Schedulers.io())
            .subscribeWith(new DisposableSubscriber<Long>() {

                @Override
                public void onNext(Long aLong) { sayHello(); }

                @Override
                public void onError(Throwable t) { sayHello(); }

                @Override
                public void onComplete() { sayHello(); }
            }));
}

@Override
protected void onDestroy() {
    super.onDestroy();

    composite.clear();
}

public void sayHello () { Log.w(TAG, "Hello everyone"); }

Solution

  • It is precisely in the source of the dispose method. You can probably jump into the source of methods in your libraries within your IDE as well, in IntelliJ it's Ctrl+B on Windows or ⌘B on Mac, and in Eclipse it's F3.

    Anyhow, here's the source of the dispose method (comments mine):

    @Override
    public void dispose() {
        if (disposed) { // nothing to do
            return;
        }
        OpenHashSet<Disposable> set; // this is the same type as our field that holds the Disposables
        synchronized (this) {
            if (disposed) { 
                return; // another thread did it while we got our lock, so nothing to do
            }
            disposed = true; // setting this flag is safe now, we're the only ones disposing
            set = resources; // the references are now in this local variable
            resources = null; // our field no longer has the references
        }
    
        dispose(set); // from here on out, only this method has the references to the Disposables
    }
    

    And then the complete code of the dispose(OpenHashSet<Disposable>) method that we called above on the last line (mostly just error handling which I believe is self-explainatory):

    /**
     * Dispose the contents of the OpenHashSet by suppressing non-fatal
     * Throwables till the end.
     * @param set the OpenHashSet to dispose elements of
     */
    void dispose(OpenHashSet<Disposable> set) {
        if (set == null) {
            return;
        }
        List<Throwable> errors = null;
        Object[] array = set.keys();
        for (Object o : array) {
            if (o instanceof Disposable) {
                try {
                    ((Disposable) o).dispose();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    if (errors == null) {
                        errors = new ArrayList<Throwable>();
                    }
                    errors.add(ex);
                }
            }
        }
        if (errors != null) {
            if (errors.size() == 1) {
                throw ExceptionHelper.wrapOrThrow(errors.get(0));
            }
            throw new CompositeException(errors);
        }
    }
    

    As you can see, at the end of that method, set can now be garbage collected, as nobody is holding a reference to it.