Search code examples
javarx-javareactivex

Joining two large datasets in RxJava


I am using RxJava to process two large datasets (millions of records) which need to be joined by an ID. These two datasets don't necessarily contain the same records. But they are sorted by the IDs.

I figured out that the join method could be used for that and the below experiment does a "full join" and that filters by the records that match.

  public class BatchTest
  {
     public static void main (String[] args)
     {
        Observable<Integer> myLeft    = Observable.just (1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        Observable<Integer> myRight   = Observable.just (1, 3, 5, 7, 9);

        myLeft.join (
           myRight,
           new Func1<Integer, Observable<Integer>>()
           {
              public Observable<Integer> call (Integer aT)
              {
                 return Observable.never ();
              }
           },
           new Func1<Integer, Observable<Integer>>()
           {
              public Observable<Integer> call (Integer aT)
              {
                 return Observable.never ();
              }
           },
           new Func2<Integer, Integer, Integer[]>()
           {
              public Integer[] call (Integer aT1, Integer aT2)
              {
                 return new Integer[] {aT1, aT2};
              }
           })
        .filter (new Func1<Integer[], Boolean> ()
        {
           public Boolean call (Integer[] aT)
           {
              return aT[0].equals (aT[1]);
           }
        })
        .subscribe (new Action1<Integer[]> ()
        {
           public void call (Integer[] aT)
           {
              System.out.printf ("%d, %d\n", aT[0], aT[1]);
           }
        });
     }
  }

This works fine for a small set of examples, but is very inefficient for a large set.

So my question is: Seeing the the set are sorted by the key, is there a way these selector/windowing functions can be used to limit the join, so I don't have to join 3 million records to 3 million records?

Or am I doing this the wrong way all together?


Solution

  • So, basically what I would up doing was to implement a custom Operator which takes in the second Observable and subscribes to it on a new Thread. The custom subscriber essentially reads in the data and sticks it into a BlockingQueue, from which it is then picked up and merged with the data from the original Observable.

    In case anyone runs into the same scenario, here it is:

    import java.util.Comparator;
    import java.util.Objects;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    import rx.Observable;
    import rx.Scheduler;
    import rx.Subscriber;
    import rx.functions.Action1;
    import rx.functions.Func2;
    
    /**
     * This class is an operator which can be used to join two {@link Observable} streams,
     * by matching them up using a {@link Comparator}. The two streams need to be sorted
     * according to the rules of the {@link Comparator} for this to work.
     * <p>
     * If the main stream is empty this might never get invoked even if the right stream
     * has data.
     */
    public class JoinByComparisonOperator<I, R> implements Observable.Operator<R, I>
    {
    
       private final RightSubscriber<I> subscriberRight;
    
       private final Comparator<I> comparator;
    
       private final Func2<I, I, Observable<R>> resultSelector;
    
      /**
       * The constructor for this class.
       * <p>
       * @param aRight
       *     The observable that is joined to the "right"
       * @param aScheduler
       *     The scheduler used to run the "right" Observable as it always needs to
       *     run on a new thread.
       * @param aComparator
       *     The comparator used to compare two input values. This should follow the
       *     same rules by which the two input streams are sorted
       * @param aResultSelector
       *     Function that gets two matching results and can handle them accordingly.
       *     Note the inputs can be null in case there was no match.
       */
       public JoinByComparisonOperator(
          final Observable<I>              aRight,
          final Scheduler                  aScheduler,
          final Comparator<I>              aComparator,
          final Func2<I, I, Observable<R>> aResultSelector
       )
       {
          subscriberRight   = new RightSubscriber<> ();
          comparator        = aComparator;
          resultSelector    = aResultSelector;
    
          aRight
             .subscribeOn (aScheduler)
             .subscribe (subscriberRight);
       }
    
       /**
        * Creates a new subscriber that gets called and passes on any calls in turn.
        * 
        * @param aSubscriber
        * @return
        * <p>
        * @see rx.functions.Func1#call(java.lang.Object)
        */
       @Override
       public Subscriber<? super I> call (final Subscriber<? super R> aSubscriber)
       {
          return new LeftSubscriber (aSubscriber);
       }
    
    
       /**
        * The subscriber for the "left" stream, which is the main stream we are operating
        * on.
        */
       private class LeftSubscriber extends Subscriber<I>
       {
    
          final Subscriber<? super R> nextSubscriber;
    
          private I nextRight;
    
          public LeftSubscriber (final Subscriber<? super R> aNextSubscriber)
          {
             nextSubscriber = aNextSubscriber;
          }
    
          private void selectResultInternal (I aLeft, I aRight)
          {
             resultSelector.call (aLeft, aRight).subscribe (new Action1<R>()
             {
                public void call (R aInput)
                {
                   nextSubscriber.onNext (aInput);
                }
             });
          }
    
          @Override
          public void onCompleted ()
          {
             if (!nextSubscriber.isUnsubscribed ())
             {
                while (!subscriberRight.isComplete () || nextRight != null)
                {
                   try
                   {
                      I myNext = null;
    
                      if (nextRight != null)
                      {
                         myNext = nextRight;
                         nextRight = null;
                      }
                      else
                      {
                         myNext = subscriberRight.takeNext ();
                      }
    
                      if (myNext != null)
                      {
                         selectResultInternal (null, myNext);
                      }
                   }
                   catch (InterruptedException myException)
                   {
                      onError (myException);
                   }
                }
    
                nextSubscriber.onCompleted ();
             }
          }
    
          @Override
          public void onError (Throwable aE)
          {
             if (!nextSubscriber.isUnsubscribed ())
             {
                nextSubscriber.onCompleted ();
    
                subscriberRight.unsubscribe ();
             }
          }
    
          @Override
          public void onNext (I aInput)
          {
             if (!nextSubscriber.isUnsubscribed ())
             {
                I myRight   = null;
                I myLeft    = aInput;
    
                if (subscriberRight.getError () != null)
                {
                   nextSubscriber.onError (subscriberRight.getError ());
                   unsubscribe ();
                }
    
                if (!subscriberRight.isComplete ())
                {
                   int myComparison = 0;
    
                   do {
    
                      if (nextRight == null)
                      {
                         try
                         {
                            nextRight = subscriberRight.takeNext ();
                         }
                         catch (InterruptedException myException)
                         {
                            onError (myException);
                            return;
                         }
                      }
    
                      if (nextRight != null)
                      {
                         myComparison   = Objects.compare (nextRight, aInput, comparator);
    
                         if (myComparison < 0)
                         {
                            selectResultInternal (null, nextRight);
                            nextRight   = null;
                         }
                         else if (myComparison == 0)
                         {
                            myRight     = nextRight;
                            nextRight   = null;
                         }
                      }
    
                   } while (myComparison < 0);
                }
    
                selectResultInternal (myLeft, myRight);
             }
          }
       }
    
       /**
        * This class is intended to consume the "right" input stream and buffer the result
        * so it can be retrieved when processing the main stream.
        */
       private class RightSubscriber<T> extends Subscriber<T>
       {
    
          private boolean complete = false;
    
          private Throwable error = null;
    
          private BlockingQueue<T> buffer = new ArrayBlockingQueue <> (1000);
    
          @Override
          public void onCompleted ()
          {
             complete = true;
          }
    
          @Override
          public void onError (Throwable aE)
          {
             error = aE;
          }
    
          @Override
          public void onNext (T aT)
          {
             try {
                buffer.put (aT);
             }
             catch (InterruptedException myException) {
                error = myException;
             }
          }
    
          public T takeNext() throws InterruptedException
          {
             return buffer.poll (10, TimeUnit.SECONDS);
          }
    
          public boolean isComplete()
          {
             return complete && buffer.size () == 0;
          }
    
          public Throwable getError()
          {
             return error;
          }
       };
    }
    

    And here is a usage example, which takes to streams of 10 million records each and matches them up.

    import java.util.Comparator;
    
    import org.csi.domain.core.batch.JoinByComparisonOperator;
    
    import rx.Observable;
    import rx.functions.Action1;
    import rx.functions.Func2;
    import rx.schedulers.Schedulers;
    
    public class JoinTest
    {
       public static void main (String[] args)
       {
          final Observable<Integer> myLeft    = Observable.range (1, 10000000);
          final Observable<Integer> myRight   = Observable.range (-100, 10000000);
    
          myLeft
             .lift (new JoinByComparisonOperator <Integer, Integer[]> (
                // The stream to be joined
                myRight,
                // The scheduler to use for the new stream
                Schedulers.newThread (),
                // The comparator to use to determine relative equality
                new Comparator<Integer>()
                {
                   public int compare (Integer aArg0, Integer aArg1)
                   {
                      return aArg0.compareTo (aArg1);
                   }
                },
                // The function that combines matches found.
                new Func2<Integer, Integer, Observable<Integer[]>>()
                {
                   public Observable<Integer[]> call (Integer aT1, Integer aT2)
                   {
                      return Observable.just (new Integer[] {aT1, aT2});
                   }
                }
             ))
             // The subscriber outputs the result to the console
             .subscribe (new Action1<Integer[]> ()
             {
                public void call (Integer[] aT)
                {
                   System.out.printf ("%d, %d\n", aT[0], aT[1]);
                }
             });
    
       }
    }