Search code examples
hibernatejpajava-streamfailover

Check JPA Query Result Stream before returning it to the caller


My program works with a faulty readonly PostgreSQL database. It uses Hibernate 6 to retrieve relatively big results from the DB. For that reason I call the Query method getResultStream() instead of getResultList() to get uncached forward-only results.

When a DB fails it's always the JDBC executeQuery() call and processing an already opened ResultSet never fails.

I wanted to add retries in my code, but the error happens only after I return the Result Stream to 3rd-party code because the Result Stream is deferred and only calls JDBC when somebody invokes a terminal operation on the stream:

org.hibernate.exception.LockAcquisitionException
caused by
org.postgresql.util.PSQLException
...
at org.postgresql.jdbc.PgPreparedStatement.executeQuery
...
at java.base/java.util.Iterator.forEachRemaining(Unknown Source)
at java.base/java.util.Spliterators$IteratorSpliterator.forEachRemaining(Unknown Source)
...
at java.base/java.util.stream.ReferencePipeline.count(Unknown Source)
at com.thirdparty.Program.main

How can I force the early call to JDBC executeQuery() and return the JPA Result Stream with unconsumed results?

Here's my code:

        LOGGER.debug("JPQL (" + offset + "," + pagesize + "):\n" + jpqlQuery);


            TypedQuery<Map<String, Object>> builder = queryBuilder(jpqlQuery);
            return builder.setFirstResult(offset).setMaxResults(pagesize).getResultStream();


Solution

  • I created a custom Spliterator that reconnects and repeats the query when it fails to get the first result.

    <T> Stream<T> doIt(String jpqlQuery, int offset, int pagesize) {
        FailoverSpliterator<T> failoverSpliterator = new FailoverSpliterator<>(() -> {
                TypedQuery<T> qb = queryBuilder(jpqlQuery);
                return qb.setFirstResult(offset).setMaxResults(pagesize).getResultStream();
            });
    
        return StreamSupport
            .stream(failoverSpliterator, false)
            .onClose(failoverSpliterator::close);
    }
    
    
    public class FailoverSpliterator<T> implements Spliterator<T> {
    
        final Supplier<Stream<T>> delegateSupplier;
    
        Spliterator<T> confirmedDelegate;
    
        Stream<T> lastStream;
    
        public FailoverSpliterator(Supplier<Stream<T>> delegateSupplier) {
            this.delegateSupplier = delegateSupplier;
        }
    
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            if (confirmedDelegate != null) {
                return confirmedDelegate.tryAdvance(action);
            }
    
            if (action == null) {
                throw new NullPointerException();
            }
    
            @SuppressWarnings("unchecked")
            T[] holder = (T[]) new Object[1];
    
            for (int i = 0;; i++) {
                final Spliterator<T> tmp;
                final boolean res;
                try {
                    lastStream = delegateSupplier.get();
                    try {
                        tmp = lastStream.spliterator();
                        res = tmp.tryAdvance(x -> {
                            holder[0] = x;
                        });
                    } catch (Throwable /* NOSONAR */ e) {
                        lastStream = closeQuietly(lastStream, e);
                        throw e;
                    }
                } catch (Exception e) {
                    exceptionalReconnect(e);
                    if (i >= MAX_FAILOVER_RETRIES) {
                        throw e;
                    }
                    LOGGER.error("Failed to get the first element of JPA Result Stream: " + e, e);
                    LOGGER.warn("retrying...");
                    continue;
                }
                confirmedDelegate = tmp;
                if (res) {
                    action.accept(holder[0]);
                }
                return res;
            }
        }
    
        public void close() {
            close(null);
        }
    
        void close(Throwable consumer) {
            lastStream = closeQuietly(lastStream, consumer);
        }
    
        @Override
        public int characteristics() {
            return 0;
        }
    
        @Override
        public long estimateSize() {
            return Long.MAX_VALUE;
        }
    
        @Override
        public Spliterator<T> trySplit() {
            return null;
        }
    }