Search code examples
javaparallel-processingsynchronizationjava-streamcollectors

Why does my file have race conditions, even though I used StandardOpenOption.SYNC?


I have a CSV File that is too large to fit in RAM, and only barely fits on my hard disk. I need to split this CSV file into parts.

I am using BufferedReader.lines() to stream the file, and now I would like to use a Collector to write the parts out. I understand that this may not be ideal, but truthfully, the task at hand isn't really the point.

The part that confused me is the behaviour of the StandardOpenOption.SYNC. Here is my code.

import java.nio.file.*;
import java.util.function.*;
import java.util.stream.*;
import java.util.*;

import static java.nio.file.StandardOpenOption.*;

public class temp2
{

   public static final Path parentFolder = 
      Path
         .of(".")
         .toAbsolutePath()
         .normalize()
         ;

   public static void main(final String[] args) throws Exception
   {
   
      System.out.println(parentFolder);
      
      for (int i = 0; i < 10; i++)
      {
      
         Files.deleteIfExists(parentFolder.resolve(String.valueOf(i)));
      
      }
   
      final int LIMIT = 100_000;
      final HashMap<String, List<String>> blahs =
         IntStream
            .range(0, 100_000_000)
            .parallel()
            .mapToObj(String::valueOf)
            .collect
            (
               Collector
                  .of
                  (
                     HashMap::new,
                     (map, s) ->
                     {
                     
                        final String key = String.valueOf(s.charAt(0));
                        final List<String> list = map.compute(key, (k, v) -> v == null ? new ArrayList<>() : v);
                        list.add(s);
                        
                        if (list.size() > LIMIT)
                        {
                        
                           writeThenClearList(key, list);
                        
                        }
                     
                     },
                     (HashMap<String, List<String>> oldMap, HashMap<String, List<String>> newMap) ->
                     {
                     
                        System.out.println(oldMap.values().stream().flatMap(Collection::stream).count());
                        System.out.println(newMap.values().stream().flatMap(Collection::stream).count());
                        System.out.println("---");
                     
                        oldMap.forEach(temp2::writeThenClearList);
                        newMap.forEach(temp2::writeThenClearList);
                        return new HashMap<>();
                     },
                     (map) ->
                     {
                     
                        map.forEach(temp2::writeThenClearList);
                        
                        return map;
                     
                     }
                  )
            )
            ;
      blahs.entrySet().forEach(System.out::println);
   
   }
   
   private static void writeThenClearList(final String key, final List<String> list)
   {
   
      if (list.isEmpty())
      {
      
         return;
      
      }
   
      try {
         Files
            .write(
               parentFolder.resolve(key),
               list,
               StandardOpenOption.CREATE,
               StandardOpenOption.WRITE,
               StandardOpenOption.APPEND,
               StandardOpenOption.SYNC
            );
      } catch (final Exception e) {
         throw new RuntimeException(e);
      }
   
      list.clear();
   
   }

}

Writing it felt pretty straight forward -- it just generates all numbers between 0 and 100 million, then groups them into files based on the starting digit. So, 0 goes into te 0 file, 1 goes into the 1 file, 10 goes into the 1 file, 20 goes into the 2 file, 300 goes into the 3 file, etc.

I also took special care to use StandardOpenOption.SYNC, to ensure that my writes were happening synchronously.

So, to analyze my results, I then wrote up and ran the following code. I used jshell, but I also got the same results running it as a normal file.

IntStream
    .range(0, 10)
    .mapToObj(String::valueOf)
    .map(Path::of)
    .map
    (
        path -> 
        { 
            try 
            { 
                return Files.lines(path); 
            } 

            catch (final Exception e) 
            { 
                throw new RuntimeException(e); 
            } 
        }
    )
    .map
    (
        stream -> 
            stream
                .filter(s -> !s.equals(""))
                .mapToLong(Long::parseLong)
                .summaryStatistics()
    )
    .forEach(System.out::println)
    ;

All this does is print out LongSummaryStatistics for each of the 10 files.

Here is what my output was.

LongSummaryStatistics{count=1, sum=0, min=0, average=0.000000, max=0}
LongSummaryStatistics{count=11110722, sum=671067271105847657, min=0, average=60398169543.423700, max=1996215711700017}
LongSummaryStatistics{count=11110807, sum=936118810008566710, min=0, average=84252998905.351040, max=2999558127899730}
LongSummaryStatistics{count=11110948, sum=726950006539566180, min=0, average=65426461049.009155, max=3699881930345059}
LongSummaryStatistics{count=11110761, sum=1580564487107336657, min=0, average=142255286303.731720, max=4980250345042342}
LongSummaryStatistics{count=11110842, sum=1291096623566986753, min=0, average=116201510521.613650, max=5990245950494212}
LongSummaryStatistics{count=11110683, sum=2140046523919170444, min=0, average=192611608478.000000, max=6999483760545061}
LongSummaryStatistics{count=11110881, sum=1629411286034487818, min=0, average=146650052865.698760, max=7993937378575107}
LongSummaryStatistics{count=11110718, sum=3892896980864594155, min=0, average=350373124478.957500, max=8993173987418912}
LongSummaryStatistics{count=11110795, sum=2930254808993867970, min=0, average=263730435940.350620, max=9996168394101800}

numPrefix count sum min average max
0 1 0 0 0.000000 0
1 11110722 671067271105847657 0 60398169543.423700 1996215711700017
2 11110807 936118810008566710 0 84252998905.351040 2999558127899730
3 11110948 726950006539566180 0 65426461049.009155 3699881930345059
4 11110761 1580564487107336657 0 142255286303.731720 4980250345042342
5 11110842 1291096623566986753 0 116201510521.613650 5990245950494212
6 11110683 2140046523919170444 0 192611608478.000000 6999483760545061
7 11110881 1629411286034487818 0 146650052865.698760 7993937378575107
8 11110718 3892896980864594155 0 350373124478.957500 8993173987418912
9 11110795 2930254808993867970 0 263730435940.350620 9996168394101800

Now, the thing that immediately popped out to me was the max value for each column. That's MUCH too high. Some were reporting numbers in the quadrillions. And furthermore, everyone had a min of 0. And furthermore, none of the counts are the same. They should be the same, right?

So ok, we have a race condition. What I don't get, however, is why the StandardOpenOption.SYNC did not protect me. Isn't that its job?

And if it's NOT its job, then what exactly is that option doing for me?


Solution

  • I didn't fully analyze your code, but i'm pretty sure you are misunderstanding what StandardOpenOption.SYNC means. It has nothing to do with multi-threading. It has to do with persistence of the data when written to the filesystem.

    Modern file operations lie to you. You say "write this to disk", and the system says "sure, done". In reality, however, the data was stuck in a buffer somewhere to be later written to the physical disk (because writing to the filesystem is relatively slow). I.e. the data will be written "asynchronously" to disk at some later point in time.

    Setting the SYNC option on file writes indicates that you really want the data written to the physical disk when you do the write operation. So, you say "write this to disk" and the system says "sure, done" and you say "no really, i'll wait" and the system says "fine ... ok, really done". (in practice this is slower, which is why it is not the default) I.e. the data is written "synchronously" to the disk during the write operation.

    If you are depending on this to protect you while having multiple threads write to the same file, then you will not get the results you desire. You can use something like FileLock to do actual file locking or you can use some sort of synchronization mechanism within java itself (my recommendation).