Search code examples
javagzipinputstream

WatcherService to tail Gzip log files


I have a directory containing gzip compressed log files with one event per line. In order to read and process these real-time, I have created a WatcherService that are identical to the code listed here: http://docs.oracle.com/javase/tutorial/essential/io/notification.html

In the processEvents() method, I have added this code to read the files that have been added or appended, line by line:

if (kind == ENTRY_MODIFY) {
    try(BufferedReader reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(Files.newInputStream(child, StandardOpenOption.READ))))) {
        String line;
        while ((line = reader.readLine()) != null) {
            System.out.println(line);
        }
    }
    catch(EOFException ex) {
        //file is empty,  so ignore until next signal
    }
    catch(Exception ex) {
        ex.printStackTrace();
    }
}

Now, as you can imagine, this works great for files that are created written and closed within milliseconds, however, when working with large files that are appended over time, this will read the entire file over and over again for every appended line (given that the file is flushed and synced by the producer now and then).

Is there any way I can read only the new lines in this file every time a ENTRY_MODIFY signal is sent, or find out when the file is "complete"?

How do I deal with files that are not appended, but rather overwritten?


Solution

  • First I would like to answer the technical aspect of your question:

    A WatchEvent just gives you the file name of a changed (or created or deleted) file and nothing more. So if you need any logic beyond this you have to be implement it on your own (or use an existing library of course).

    If you want to read only new lines, you have to remember the position for each file and whenever this file is changed you can move to the last known position. To get the current position you could use a CountingInputStream from the Commons IO package (credits go to [1]). To jump to the last position, you can use the function skip.

    But you are using a GZIPInputStream, this means that skip will not give you a great performance boost since skipping a compressed stream is not possible. Instead GZIPInputStream skip will uncompress the stream as it would when you are reading it so you will experience only little performance improvements (try it!).

    What I don't understand is why you are using compressed log files at all? Why don't you write uncompressed logs with a DailyRollingFileAppender and compress it at the end of the day, when the application doesn't access it anymore?

    Another solution could be to keep the GZIPInputStream (store it) so that you don't have to reread the file again. It may depend on how many log files you have to watch to decide if this is reasonable.

    Now some questions on your requirements:

    You didn't mention the reason why you want to watch the log files in real time. Why don't you centralize your logs (see Centralised Java Logging)? For example take a look on logstash and this presentation (see [2] and [3]) or on scribe or on splunk, which is commercial (see [4]).

    A centralized log would give you the opportunity to really have real time reactions based on your log data.

    [1] https://stackoverflow.com/a/240740/734687
    [2] Using elasticsearch, logstash & kibana to create realtime dashboards - slides
    [3] Using elasticsearch, logstash & kibana to create realtime dashboards - video
    [4] Log Aggregation with Splunk - slides

    Update

    First, a Groovy script to generate a zipped log file. I start this script from GroovyConsole each time I want to simulate a log file change:

    // Run with GroovyConsole each time you want new entries
    def file = new File('D:\\Projekte\\watcher_service\\data\\log.gz')
    
    // reading previous content since append is not possible
    def content
    if (file.exists()) {
        def inStream = new java.util.zip.GZIPInputStream(file.newInputStream())
        content = inStream.readLines()
    }
    
    // writing previous content and append new data
    def random  = new java.util.Random()  
    def lineCount = random.nextInt(30) + 1
    def outStream = new java.util.zip.GZIPOutputStream(file.newOutputStream())
    
    outStream.withWriter('UTF-8') { writer ->
        if (content) {
            content.each { writer << "$it\n" }
        }
        (1 .. lineCount).each {
            writer.write "Writing line $it/$lineCount\n"
        }
        writer.write '---Finished---\n'
        writer.flush()
        writer.close()
    }
    
    println "Wrote ${lineCount + 1} lines."
    

    Then the logfile reader:

    import java.nio.file.FileSystems
    import java.nio.file.Files
    import java.nio.file.Path
    import java.nio.file.Paths
    import java.nio.file.StandardOpenOption
    import java.util.zip.GZIPInputStream
    import org.apache.commons.io.input.CountingInputStream
    import static java.nio.file.StandardWatchEventKinds.*
    
    class LogReader
    {
        private final Path dir = Paths.get('D:\\Projekte\\watcher_service\\data\\')
        private watcher
        private positionMap = [:]
        long lineCount = 0
    
        static void main(def args)
        {
            new LogReader().processEvents()
        }
    
        LogReader()
        {
            watcher = FileSystems.getDefault().newWatchService()
            dir.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)
        }
    
        void processEvents()
        {
            def key = watcher.take()
            boolean doLeave = false
    
            while ((key != null) && (doLeave == false))
            {
                key.pollEvents().each { event ->
                    def kind = event.kind()
                    Path name = event.context()
    
                    println "Event received $kind: $name"
                    if (kind == ENTRY_MODIFY) {
                        // use position from the map, if entry is not there use default value 0
                        processChange(name, positionMap.get(name.toString(), 0))
                    }
                    else if (kind == ENTRY_CREATE) {
                        processChange(name, 0)
                    }
                    else {
                        doLeave = true
                        return
                    }
                }
                key.reset()
                key = watcher.take()
            }
        }
    
        private void processChange(Path name, long position)
        {
            // open file and go to last position
            Path absolutePath = dir.resolve(name)
            def countingStream =
                    new CountingInputStream(
                    new GZIPInputStream(
                    Files.newInputStream(absolutePath, StandardOpenOption.READ)))
            position = countingStream.skip(position)
            println "Moving to position $position"
    
            // processing each new line
            // at the first start all lines are read
            int newLineCount = 0
            countingStream.withReader('UTF-8') { reader ->
                reader.eachLine { line ->
                    println "${++lineCount}: $line"
                    ++newLineCount
                }
            }
            println "${++lineCount}: $newLineCount new lines +++Finished+++"
    
            // store new position in map
            positionMap[name.toString()] = countingStream.count
            println "Storing new position $countingStream.count"
            countingStream.close()
        }
    }
    

    In the function processChange you can see 1) the creation of the inputstreams. The line with the .withReader creates the InputStreamReader and the BufferedReader. I use always Grovvy, it is Java on stereoids and when you start using it, you cannot stop. A Java developer should be able to read it, but if you have questions just comment.