Search code examples
javasplitjava-streamlazy-evaluationpredicate

Java split stream by predicate into stream of streams


I have hundreds of large (6GB) gziped log files that I'm reading using GZIPInputStreams that I wish to parse. Suppose each one has the format:

Start of log entry 1
    ...some log details
    ...some log details
    ...some log details
Start of log entry 2
    ...some log details
    ...some log details
    ...some log details
Start of log entry 3
    ...some log details
    ...some log details
    ...some log details

I'm streaming the gziped file contents line by line through BufferedReader.lines(). The stream looks like:

[
    "Start of log entry 1",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
    "Start of log entry 2",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
    "Start of log entry 2",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
]

The start of every log entry can by identified by the predicate: line -> line.startsWith("Start of log entry"). I would like to transform this Stream<String> into a Stream<Stream<String>> according to this predicate. Each "substream" should start when the predicate is true, and collect lines while the predicate is false, until the next time the predicate true, which denotes the end of this substream and the start of the next. The result would look like:

[
    [
        "Start of log entry 1",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
    [
        "Start of log entry 2",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
    [
        "Start of log entry 3",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
]

From there, I can take each substream and map it through new LogEntry(Stream<String> logLines) so as to aggregate related log lines into LogEntry objects.

Here's a rough idea of how that would look:

import java.io.*;
import java.nio.charset.*;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;

import static java.lang.System.out;

class Untitled {
    static final String input = 
        "Start of log entry 1\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "Start of log entry 2\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "Start of log entry 3\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details";

    static final Predicate<String> isLogEntryStart = line -> line.startsWith("Start of log entry"); 

    public static void main(String[] args) throws Exception {
        try (ByteArrayInputStream gzipInputStream
        = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); // mock for fileInputStream based gzipInputStream
             InputStreamReader inputStreamReader = new InputStreamReader( gzipInputStream ); 
             BufferedReader reader = new BufferedReader( inputStreamReader )) {

            reader.lines()
                .splitByPredicate(isLogEntryStart) // <--- What witchcraft should go here?
                .map(LogEntry::new)
                .forEach(out::println);
        }
    }
}

Constraint: I have hundreds of these large files to process, in parallel (but only a single sequential stream per file), which makes loading them them entirely into memory (e.g. by storing them as a List<String> lines) is not feasible.

Any help appreciated!


Solution

  • Frederico's answer is probably the nicest way for this particular problem. Following his last thought about custom Spliterator, I'll add an adapted version of an answer to a similar question, where I proposed using a custom iterator to created a chunked stream. This approach would also work on other streams that are not created by input readers.

    public class StreamSplitter<T>
        implements Iterator<Stream<T>>
    {
        private Iterator<T>  incoming;
        private Predicate<T> startOfNewEntry;
        private T            nextLine;
    
        public static <T> Stream<Stream<T>> streamOf(Stream<T> incoming, Predicate<T> startOfNewEntry)
        {
            Iterable<Stream<T>> iterable = () -> new StreamSplitter<>(incoming, startOfNewEntry);
            return StreamSupport.stream(iterable.spliterator(), false);
        }
    
        private StreamSplitter(Stream<T> stream, Predicate<T> startOfNewEntry)
        {
            this.incoming = stream.iterator();
            this.startOfNewEntry = startOfNewEntry;
            if (incoming.hasNext())
                nextLine = incoming.next();
        }
    
        @Override
        public boolean hasNext()
        {
            return nextLine != null;
        }
    
        @Override
        public Stream<T> next()
        {
            List<T> nextEntrysLines = new ArrayList<>();
            do
            {
                nextEntrysLines.add(nextLine);
            } while (incoming.hasNext()
                     && !startOfNewEntry.test((nextLine = incoming.next())));
    
            if (!startOfNewEntry.test(nextLine)) // incoming does not have next
                nextLine = null;
    
            return nextEntrysLines.stream();
        }
    }
    

    Example

    public static void main(String[] args)
    {
        Stream<String> flat = Stream.of("Start of log entry 1",
                                        "    ...some log details",
                                        "    ...some log details",
                                        "Start of log entry 2",
                                        "    ...some log details",
                                        "    ...some log details",
                                        "Start of log entry 3",
                                        "    ...some log details",
                                        "    ...some log details");
    
        StreamSplitter.streamOf(flat, line -> line.matches("Start of log entry.*"))
                      .forEach(logEntry -> {
                          System.out.println("------------------");
                          logEntry.forEach(System.out::println);
                      });
    }
    
    // Output
    // ------------------
    // Start of log entry 1
    //     ...some log details
    //     ...some log details
    // ------------------
    // Start of log entry 2
    //     ...some log details
    //     ...some log details
    // ------------------
    // Start of log entry 3
    //     ...some log details
    //     ...some log details
    

    The iterator always looks one line ahead. As soon as that lline is the beginning of a new entry, it will wrapp the previous entry in a stream and return it as next. The factory method streamOf turns this iterator into a stream to be used as in the example I gave above.

    I changed the split condition from a regex to a Predicate, so you can specify more complicated conditions with the help of multiple regexes, if-conditions, and so on.

    Note that I only tested it with the example data above, so I don't know how it would behave with more complicated, errornous, or empty input.