Search code examples
javastreamiteratormemory-optimization

Is there something like an Iterator, but with functions like Streams?


So basically what I am trying to do is the following:

  1. Load Batch of Data from the Database
  2. Map that data (Object[] query result) to a class representing the data in a readable format
  3. Write to File
  4. Repeat until query gets no more results

I listed the structures that I am familiar with that seem to fit the need and why they don't fit my needs.

  • Iterator → Has no option to map and filter without calling next()
    • I need to define the map function in a subclass though without actually having the data (similar to a stream), so that I can pass the "Stream" way up to a calling class and only there call next, which then calls all the map functions as a result
  • Stream → All data needs to be available before mapping and filtering is possible
  • Observable → Sends data as soon as it comes available. I need to process it in sync though

To get more of a feeling what I am trying to do, I made a small example:

// Disclaimer: "Something" is the structure I am not sure of now. 
// Could be an Iterator or something else that fits (Thats the question)
public class Orchestrator {
    @Inject
    private DataGetter dataGetter;

    public void doWork() {
        FileWriter writer = new FileWriter("filename");

        // Write the formatted data to the file
        dataGetter.getData()
            .forEach(data -> writer.writeToFile(data));
    }
}

public class FileWriter {
    public void writeToFile(List<Thing> data) {
        // Write to file
    }
}

public class DataGetter {
    @Inject
    private ThingDao thingDao;

    public Something<List<Thing>> getData() {

        // Map data to the correct format and return that
        return thingDao.getThings()
            .map(partialResult -> /* map to object */);
    }
}

public class ThingDao {

    public Something<List<Object[]>> getThings() {
        Query q = ...;
        // Dont know what to return
    }
}

What I have got so far:

I tried to go from the base of an Iterator, because it's the only one that really fulfills my memory requirements. Then I have added some methods to map and loop over the data. It's not really a robust design though and it's going to be harder than I thought, so I wanted to know if there is anything out there already that does what I need.

public class QIterator<E> implements Iterator<List<E>> {
    public static String QUERY_OFFSET = "queryOffset";
    public static String QUERY_LIMIT = "queryLimit";

    private Query query;

    private long lastResultIndex = 0;
    private long batchSize;

    private Function<List<Object>, List<E>> mapper;

    public QIterator(Query query, long batchSize) {
        this.query = query;
        this.batchSize = batchSize;
    }

    public QIterator(Query query, long batchSize, Function<List<Object>, List<E>> mapper) {
        this(query, batchSize);
        this.mapper = mapper;
    }

    @Override
    public boolean hasNext() {
        return lastResultIndex % batchSize == 0;
    }

    @Override
    public List<E> next() {
        query.setParameter(QueryIterator.QUERY_OFFSET, lastResultIndex);
        query.setParameter(QueryIterator.QUERY_LIMIT, batchSize);

        List<Object> result = (List<Object>) query.getResultList(); // unchecked
        lastResultIndex += result.size();

        List<E> mappedResult;
        if (mapper != null) {
            mappedResult = mapper.apply(result);
        } else {
            mappedResult = (List<E>) result; // unchecked
        }

        return mappedResult;
    }

    public <R> QIterator<R> map(Function<List<E>, List<R>> appendingMapper) {
        return new QIterator<>(query, batchSize, (data) -> {
            if (this.mapper != null) {
                return appendingMapper.apply(this.mapper.apply(data));
            } else {
                return appendingMapper.apply((List<E>) data);
            }
        });
    }

    public void forEach(BiConsumer<List<E>, Integer> consumer) {
        for (int i = 0; this.hasNext(); i++) {
            consumer.accept(this.next(), i);
        }
    }
}

This works so far, but has some unchecked assignments which I do not really like and also I would like to have the ability to "append" one QIterator to another which is not hard by itself, but it should also take the maps that follow after the append.


Solution

  • Assume you have a DAO that provides data in a paginated manner, e.g. by applying the LIMIT and OFFSET clauses to the underlying SQL. Such a DAO class would have a method that takes those values as argument, i.e. the method would conform to the following functional method:

    @FunctionalInterface
    public interface PagedDao<T> {
        List<T> getData(int offset, int limit);
    }
    

    E.g. calling getData(0, 20) would return the first 20 rows (page 1), calling getData(60, 20) would return the 20 rows on page 4. If the method returns less than 20 rows, it means we got the last page. Asking for data after the last row will return an empty list.

    For the demo below, we can mock such a DAO class:

    public class MockDao {
        private final int rowCount;
        public MockDao(int rowCount) {
            this.rowCount = rowCount;
        }
        public List<SimpleRow> getSimpleRows(int offset, int limit) {
            System.out.println("DEBUG: getData(" + offset + ", " + limit + ")");
            if (offset < 0 || limit <= 0)
                throw new IllegalArgumentException();
            List<SimpleRow> data = new ArrayList<>();
            for (int i = 0, rowNo = offset + 1; i < limit && rowNo <= this.rowCount; i++, rowNo++)
                data.add(new SimpleRow("Row #" + rowNo));
            System.out.println("DEBUG:   data = " + data);
            return data;
        }
    }
    
    public class SimpleRow {
        private final String data;
        public SimpleRow(String data) {
            this.data = data;
        }
        @Override
        public String toString() {
            return "Row[data=" + this.data + "]";
        }
    }
    

    If you then want to generate a Stream of rows from that method, streaming all rows in blocks of a certain size, we need a Spliterator for that, so we can use StreamSupport.stream(Spliterator<T> spliterator, boolean parallel) to create a stream.

    Here is an implementation of such a Spliterator:

    public class PagedDaoSpliterator<T> implements Spliterator<T> {
        private final PagedDao<T> dao;
        private final int blockSize;
        private int nextOffset;
        private List<T> data;
        private int dataIdx;
        public PagedDaoSpliterator(PagedDao<T> dao, int blockSize) {
            if (blockSize <= 0)
                throw new IllegalArgumentException();
            this.dao = Objects.requireNonNull(dao);
            this.blockSize = blockSize;
        }
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            if (this.data == null) {
                if (this.nextOffset == -1/*At end*/)
                    return false; // Already at end
                this.data = this.dao.getData(this.nextOffset, this.blockSize);
                this.dataIdx = 0;
                if (this.data.size() < this.blockSize)
                    this.nextOffset = -1/*At end, after this data*/;
                else
                    this.nextOffset += data.size();
                if (this.data.isEmpty()) {
                    this.data = null;
                    return false; // At end
                }
            }
            action.accept(this.data.get(this.dataIdx++));
            if (this.dataIdx == this.data.size())
                this.data = null;
            return true;
        }
        @Override
        public Spliterator<T> trySplit() {
            return null; // Parallel processing not supported
        }
        @Override
        public long estimateSize() {
            return Long.MAX_VALUE; // Unknown
        }
        @Override
        public int characteristics() {
            return ORDERED | NONNULL;
        }
    }
    

    We can now test that using the mock DAO above:

    MockDao dao = new MockDao(13);
    Stream<SimpleRow> stream = StreamSupport.stream(
            new PagedDaoSpliterator<>(dao::getSimpleRows, 5), /*parallel*/false);
    stream.forEach(System.out::println);
    

    Output

    DEBUG: getData(0, 5)
    DEBUG:   data = [Row[data=Row #1], Row[data=Row #2], Row[data=Row #3], Row[data=Row #4], Row[data=Row #5]]
    Row[data=Row #1]
    Row[data=Row #2]
    Row[data=Row #3]
    Row[data=Row #4]
    Row[data=Row #5]
    DEBUG: getData(5, 5)
    DEBUG:   data = [Row[data=Row #6], Row[data=Row #7], Row[data=Row #8], Row[data=Row #9], Row[data=Row #10]]
    Row[data=Row #6]
    Row[data=Row #7]
    Row[data=Row #8]
    Row[data=Row #9]
    Row[data=Row #10]
    DEBUG: getData(10, 5)
    DEBUG:   data = [Row[data=Row #11], Row[data=Row #12], Row[data=Row #13]]
    Row[data=Row #11]
    Row[data=Row #12]
    Row[data=Row #13]
    

    As can be seen, we get 13 rows of data, retrieved from the database in blocks of 5 rows.

    The data is not retrieved from the database until it is needed, causing low memory footprint, depending on block size and the stream operation not caching the data.