So basically what I am trying to do is the following:
Object[]
query result) to a class representing the data in a readable formatI listed the structures that I am familiar with that seem to fit the need and why they don't fit my needs.
next()
next
, which then calls all the map functions as a resultTo get more of a feeling what I am trying to do, I made a small example:
// Disclaimer: "Something" is the structure I am not sure of now.
// Could be an Iterator or something else that fits (Thats the question)
public class Orchestrator {
@Inject
private DataGetter dataGetter;
public void doWork() {
FileWriter writer = new FileWriter("filename");
// Write the formatted data to the file
dataGetter.getData()
.forEach(data -> writer.writeToFile(data));
}
}
public class FileWriter {
public void writeToFile(List<Thing> data) {
// Write to file
}
}
public class DataGetter {
@Inject
private ThingDao thingDao;
public Something<List<Thing>> getData() {
// Map data to the correct format and return that
return thingDao.getThings()
.map(partialResult -> /* map to object */);
}
}
public class ThingDao {
public Something<List<Object[]>> getThings() {
Query q = ...;
// Dont know what to return
}
}
What I have got so far:
I tried to go from the base of an Iterator, because it's the only one that really fulfills my memory requirements. Then I have added some methods to map and loop over the data. It's not really a robust design though and it's going to be harder than I thought, so I wanted to know if there is anything out there already that does what I need.
public class QIterator<E> implements Iterator<List<E>> {
public static String QUERY_OFFSET = "queryOffset";
public static String QUERY_LIMIT = "queryLimit";
private Query query;
private long lastResultIndex = 0;
private long batchSize;
private Function<List<Object>, List<E>> mapper;
public QIterator(Query query, long batchSize) {
this.query = query;
this.batchSize = batchSize;
}
public QIterator(Query query, long batchSize, Function<List<Object>, List<E>> mapper) {
this(query, batchSize);
this.mapper = mapper;
}
@Override
public boolean hasNext() {
return lastResultIndex % batchSize == 0;
}
@Override
public List<E> next() {
query.setParameter(QueryIterator.QUERY_OFFSET, lastResultIndex);
query.setParameter(QueryIterator.QUERY_LIMIT, batchSize);
List<Object> result = (List<Object>) query.getResultList(); // unchecked
lastResultIndex += result.size();
List<E> mappedResult;
if (mapper != null) {
mappedResult = mapper.apply(result);
} else {
mappedResult = (List<E>) result; // unchecked
}
return mappedResult;
}
public <R> QIterator<R> map(Function<List<E>, List<R>> appendingMapper) {
return new QIterator<>(query, batchSize, (data) -> {
if (this.mapper != null) {
return appendingMapper.apply(this.mapper.apply(data));
} else {
return appendingMapper.apply((List<E>) data);
}
});
}
public void forEach(BiConsumer<List<E>, Integer> consumer) {
for (int i = 0; this.hasNext(); i++) {
consumer.accept(this.next(), i);
}
}
}
This works so far, but has some unchecked
assignments which I do not really like and also I would like to have the ability to "append" one QIterator to another which is not hard by itself, but it should also take the maps that follow after the append.
Assume you have a DAO that provides data in a paginated manner, e.g. by applying the LIMIT
and OFFSET
clauses to the underlying SQL. Such a DAO class would have a method that takes those values as argument, i.e. the method would conform to the following functional method:
@FunctionalInterface
public interface PagedDao<T> {
List<T> getData(int offset, int limit);
}
E.g. calling getData(0, 20)
would return the first 20 rows (page 1), calling getData(60, 20)
would return the 20 rows on page 4. If the method returns less than 20 rows, it means we got the last page. Asking for data after the last row will return an empty list.
For the demo below, we can mock such a DAO class:
public class MockDao {
private final int rowCount;
public MockDao(int rowCount) {
this.rowCount = rowCount;
}
public List<SimpleRow> getSimpleRows(int offset, int limit) {
System.out.println("DEBUG: getData(" + offset + ", " + limit + ")");
if (offset < 0 || limit <= 0)
throw new IllegalArgumentException();
List<SimpleRow> data = new ArrayList<>();
for (int i = 0, rowNo = offset + 1; i < limit && rowNo <= this.rowCount; i++, rowNo++)
data.add(new SimpleRow("Row #" + rowNo));
System.out.println("DEBUG: data = " + data);
return data;
}
}
public class SimpleRow {
private final String data;
public SimpleRow(String data) {
this.data = data;
}
@Override
public String toString() {
return "Row[data=" + this.data + "]";
}
}
If you then want to generate a Stream
of rows from that method, streaming all rows in blocks of a certain size, we need a Spliterator
for that, so we can use StreamSupport.stream(Spliterator<T> spliterator, boolean parallel)
to create a stream.
Here is an implementation of such a Spliterator
:
public class PagedDaoSpliterator<T> implements Spliterator<T> {
private final PagedDao<T> dao;
private final int blockSize;
private int nextOffset;
private List<T> data;
private int dataIdx;
public PagedDaoSpliterator(PagedDao<T> dao, int blockSize) {
if (blockSize <= 0)
throw new IllegalArgumentException();
this.dao = Objects.requireNonNull(dao);
this.blockSize = blockSize;
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if (this.data == null) {
if (this.nextOffset == -1/*At end*/)
return false; // Already at end
this.data = this.dao.getData(this.nextOffset, this.blockSize);
this.dataIdx = 0;
if (this.data.size() < this.blockSize)
this.nextOffset = -1/*At end, after this data*/;
else
this.nextOffset += data.size();
if (this.data.isEmpty()) {
this.data = null;
return false; // At end
}
}
action.accept(this.data.get(this.dataIdx++));
if (this.dataIdx == this.data.size())
this.data = null;
return true;
}
@Override
public Spliterator<T> trySplit() {
return null; // Parallel processing not supported
}
@Override
public long estimateSize() {
return Long.MAX_VALUE; // Unknown
}
@Override
public int characteristics() {
return ORDERED | NONNULL;
}
}
We can now test that using the mock DAO above:
MockDao dao = new MockDao(13);
Stream<SimpleRow> stream = StreamSupport.stream(
new PagedDaoSpliterator<>(dao::getSimpleRows, 5), /*parallel*/false);
stream.forEach(System.out::println);
Output
DEBUG: getData(0, 5)
DEBUG: data = [Row[data=Row #1], Row[data=Row #2], Row[data=Row #3], Row[data=Row #4], Row[data=Row #5]]
Row[data=Row #1]
Row[data=Row #2]
Row[data=Row #3]
Row[data=Row #4]
Row[data=Row #5]
DEBUG: getData(5, 5)
DEBUG: data = [Row[data=Row #6], Row[data=Row #7], Row[data=Row #8], Row[data=Row #9], Row[data=Row #10]]
Row[data=Row #6]
Row[data=Row #7]
Row[data=Row #8]
Row[data=Row #9]
Row[data=Row #10]
DEBUG: getData(10, 5)
DEBUG: data = [Row[data=Row #11], Row[data=Row #12], Row[data=Row #13]]
Row[data=Row #11]
Row[data=Row #12]
Row[data=Row #13]
As can be seen, we get 13 rows of data, retrieved from the database in blocks of 5 rows.
The data is not retrieved from the database until it is needed, causing low memory footprint, depending on block size and the stream operation not caching the data.