Search code examples
javarx-javareactive-programmingjava-6

Cleaning up an Iterable when not all elements are read


Getting my feet wet on RxJava. I have a class that implements Iterable I want to convert to an Observable. Using Observable.from() seems easy. However I need to setup and tear-down the code that provides me the individual entries (the next() in the iterator.

When I run through the entire sequence, that's easy. I added the call to the hasNext() function and when there is no next I run the teardown. However one of the very promising operators I want to use is take(someNumber). If the taking stops before the Iterator runs out of items, the cleanup code never runs.

What can I do to get my cleanup running? If using something else than from(Iterable), I'm OK with that. I'm stuck on Java6 for now. To illustrate my predicament I created a minimal sample:

Update: Based on feedback not to mix Iterator and Iterable together, I updated the code below. To understand the original answers, the original code is in that gist.

Updated Test code (still bad):

import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;

/**
* @author stw
*
*/
public class RXTest {

/**
 * @param args
 */
public static void main(String[] args) {
  ComplicatedObject co = new ComplicatedObject();
  Observable<FancyObject> fancy = Observable.from(co);
  // if the take is less than the elements cleanup never
  // runs. If you take the take out, cleanup runs
  fancy.take(3).subscribe(
      new Action1<FancyObject>() {

        public void call(FancyObject item) {
            System.out.println(item.getName());
        }
    },
    new Action1<Throwable>() {

        public void call(Throwable error) {
            System.out.println("Error encountered: " + error.getMessage());
        }
    },
    new Action0() {

        public void call() {
            System.out.println("Sequence complete");
        }
    }

      );

}

}

The fancy object:

import java.util.Date;
import java.util.UUID;

/**
* @author stw
*
*/
public class FancyObject  {
private String name = UUID.randomUUID().toString();
private Date created = new Date();
public String getName() {
  return this.name;
}
public void setName(String name) {
  this.name = name;
}
public Date getCreated() {
  return this.created;
}
public void setCreated(Date created) {
  this.created = created;
}
}

The iterator:

import java.util.Iterator;

/**
 * @author stw
 *
 */
public class FancyIterator implements Iterator<FancyObject> {

  private final ComplicatedObject theObject;
  private int fancyCount = 0;


  public FancyIterator(ComplicatedObject co) {
    this.theObject = co;
  }

  public boolean hasNext() {
    return this.theObject.hasObject(this.fancyCount);
   }


   public FancyObject next() {
     FancyObject result = this.theObject.getOne(this.fancyCount);
     this.fancyCount++;  
     return result;
   }

}

The Iterable:

import java.util.Iterator;
import java.util.Vector;

/**
 * @author stw
 *
 */
public class ComplicatedObject implements Iterable<FancyObject> {

  private boolean isInitialized = false;

  Vector<FancyObject> allOfThem = new Vector<FancyObject>();


  public Iterator<FancyObject> iterator() {
   return new FancyIterator(this);
  }

  public boolean hasObject(int whichone) {
    if (!this.isInitialized) {
      this.setupAccesstoFancyObject(); 
    }
    return (whichone < this.allOfThem.size());
  }
  public FancyObject getOne(int whichone) {
      if (!this.isInitialized) {
        this.setupAccesstoFancyObject();
      }
      if (whichone < this.allOfThem.size()) {
        return this.allOfThem.get(whichone);
      }
      // If we ask bejond...
      this.isInitialized = false;
      this.teardownAccessToFancyObjects();
      return null;
  }

  private void setupAccesstoFancyObject() {
    System.out.println("Initializing fancy objects");
    for (int i = 0; i < 20; i++) {
      this.allOfThem.addElement(new FancyObject());
    }
    this.isInitialized = true;
  }

  private void teardownAccessToFancyObjects() {
    System.out.println("I'm doing proper cleanup here");

  }

}

But the real question (thx @Andreas) seem to be:

What construct can I use to create an Observable when the underlying code need setup/teardown, especially when one expects that not all elements are pulled. The Iterable just was my first idea

Update 2: Based on Dave's answer I created a gist with my working solution. The iterator isn't perfect, but it's a start.


Solution

  • Observable.using is used for tearing down on termination (completion or error) or unsubscription. To use it you need to make the tear-down code accessible so that your source observable can look like this:

    source = Observable.using(
        resourceFactory, 
        observableFactory, 
        resourceDisposer);
    

    With your code it might look like this:

    source = Observable.using(
        () -> new ComplicatedObject(),
        co -> Observable.from(co), 
        co -> co.tearDown());