Search code examples
rx-java2

GroupBy immediately following keys RxJava


I have a simple problem, I aquire a bunch of rows coming from an SQL Database. I need to collapse those rows, reduce them in multiple json objects. Using rxJava, I have :

Promise<Flowable<Data>> p = Promise.promise();
p.complete(observable
             .groupBy(array -> array.getValue(0))
             .flatMap(g -> extractData(g));

The thing is, when there is more than a certain amount of data this code hangs. I think
I understand why, because of the flatMap's concurrency capping, and because my groups never end themeselves and waits for the main observable to end before returning (the reduce needs all the values in the group to work). The thing is, the lines that I process are guaranteed to be ordered by a specific key, the same that the one I'm grouping on.

I search a way to close a previous group when a new group is created. Is there a way to achieve that ?

I thought that groupByUntil would allow that but it seems it's now merged in the groupBy method (in RxJava at least) and I can't manage to find a way using takeUntil/takeWhile

EDIT:

Just realizing that without the content of extractData it's quite hard to understand, here it is :

return group.reduce(new Data(), <business logic>).toFlowable()

Solution

  • It seems I managed to do it using my own operator and the lift method. It's really far from being bullet proof, but it's working for now ... It's quite hard to follow all the guidelines the guide and I hope I'm not messing around with backpressure or something :

    private static final class ConvertSQLRowsToSchemaInstances implements FlowableOperator<SchemaInstance, JsonArray> {
      private final SQLRowStream rows;
      private final Loader loader;
      private final Table table;
      private final List<Map.Entry<String, TableField>> wanted;
      private final List<Map.Entry<String, TableField>> unwanted;
      private final TableField unicity;
    
      public ConvertSQLRowsToSchemaInstances(SQLRowStream rows, Loader loader, Table table,
                                             List<Map.Entry<String, TableField>> wanted,
                                             List<Map.Entry<String, TableField>> unwanted) {
        this.rows = rows;
        this.loader = loader;
        this.table = table;
        this.wanted = wanted;
        this.unwanted = unwanted;
        this.unicity = table.getUnicityFields().get(0);
      }
    
      @NonNull
      @Override
      public Subscriber<? super JsonArray> apply(@NonNull Subscriber<? super SchemaInstance> subscriber) throws Exception {
        return new Op(subscriber);
      }
    
      private final class Op implements FlowableSubscriber<JsonArray>, Subscription {
        final Subscriber<? super SchemaInstance> child;
    
        SchemaInstance si = null;
        Subscription s;
    
        public Op(Subscriber<? super SchemaInstance> child) {
          this.child = child;
        }
    
        @Override
        public void onSubscribe(Subscription s) {
          this.s = s;
          child.onSubscribe(this);
        }
    
        @Override
        public void onNext(JsonArray array) {
          try {
            if (si == null) si = loader.getEmptyInstance(table.getSchema());
            else if (!si.get(unicity.getName()).get().equals(array.getValue(rows.column(unicity.getName())))) {
              // New schema arrived
              child.onNext(si);
              si = loader.getEmptyInstance(table.getSchema());
            }
            extractData(si, array, rows, table, loader, wanted, unwanted);
            // request(1); Wrong ! Would destroy the backpressure ... 
          } catch (UnknownTypeException | IllegalAccessException | InstantiationException | NoSuchFieldException e) {
            onError(e);
          }
        }
    
        @Override
        public void onError(Throwable e) {
          child.onError(e);
        }
    
        @Override
        public void onComplete() {
          if (si != null) child.onNext(si);
          child.onComplete();
        }
    
        @Override
        public void cancel() {
          s.cancel();
        }
    
        @Override
        public void request(long n) {
          s.request(n);
        }
      }
    }
    

    Just trying to accumulate some lines from SQL to a single instance of a class. Any advice welcome.

    EDIT:

    Don't use this code ... You need to request lines only when needed, revamp the onNext() method to not requesting lines when not already accumulating an object ...