I have a CSV File that is too large to fit in RAM, and only barely fits on my hard disk. I need to split this CSV file into parts.
I am using BufferedReader.lines()
to stream the file, and now I would like to use a Collector
to write the parts out. I understand that this may not be ideal, but truthfully, the task at hand isn't really the point.
The part that confused me is the behaviour of the StandardOpenOption.SYNC
. Here is my code.
import java.nio.file.*;
import java.util.function.*;
import java.util.stream.*;
import java.util.*;
import static java.nio.file.StandardOpenOption.*;
public class temp2
{
public static final Path parentFolder =
Path
.of(".")
.toAbsolutePath()
.normalize()
;
public static void main(final String[] args) throws Exception
{
System.out.println(parentFolder);
for (int i = 0; i < 10; i++)
{
Files.deleteIfExists(parentFolder.resolve(String.valueOf(i)));
}
final int LIMIT = 100_000;
final HashMap<String, List<String>> blahs =
IntStream
.range(0, 100_000_000)
.parallel()
.mapToObj(String::valueOf)
.collect
(
Collector
.of
(
HashMap::new,
(map, s) ->
{
final String key = String.valueOf(s.charAt(0));
final List<String> list = map.compute(key, (k, v) -> v == null ? new ArrayList<>() : v);
list.add(s);
if (list.size() > LIMIT)
{
writeThenClearList(key, list);
}
},
(HashMap<String, List<String>> oldMap, HashMap<String, List<String>> newMap) ->
{
System.out.println(oldMap.values().stream().flatMap(Collection::stream).count());
System.out.println(newMap.values().stream().flatMap(Collection::stream).count());
System.out.println("---");
oldMap.forEach(temp2::writeThenClearList);
newMap.forEach(temp2::writeThenClearList);
return new HashMap<>();
},
(map) ->
{
map.forEach(temp2::writeThenClearList);
return map;
}
)
)
;
blahs.entrySet().forEach(System.out::println);
}
private static void writeThenClearList(final String key, final List<String> list)
{
if (list.isEmpty())
{
return;
}
try {
Files
.write(
parentFolder.resolve(key),
list,
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.APPEND,
StandardOpenOption.SYNC
);
} catch (final Exception e) {
throw new RuntimeException(e);
}
list.clear();
}
}
Writing it felt pretty straight forward -- it just generates all numbers between 0 and 100 million, then groups them into files based on the starting digit. So, 0 goes into te 0 file, 1 goes into the 1 file, 10 goes into the 1 file, 20 goes into the 2 file, 300 goes into the 3 file, etc.
I also took special care to use StandardOpenOption.SYNC
, to ensure that my writes were happening synchronously.
So, to analyze my results, I then wrote up and ran the following code. I used jshell
, but I also got the same results running it as a normal file.
IntStream
.range(0, 10)
.mapToObj(String::valueOf)
.map(Path::of)
.map
(
path ->
{
try
{
return Files.lines(path);
}
catch (final Exception e)
{
throw new RuntimeException(e);
}
}
)
.map
(
stream ->
stream
.filter(s -> !s.equals(""))
.mapToLong(Long::parseLong)
.summaryStatistics()
)
.forEach(System.out::println)
;
All this does is print out LongSummaryStatistics
for each of the 10 files.
Here is what my output was.
LongSummaryStatistics{count=1, sum=0, min=0, average=0.000000, max=0}
LongSummaryStatistics{count=11110722, sum=671067271105847657, min=0, average=60398169543.423700, max=1996215711700017}
LongSummaryStatistics{count=11110807, sum=936118810008566710, min=0, average=84252998905.351040, max=2999558127899730}
LongSummaryStatistics{count=11110948, sum=726950006539566180, min=0, average=65426461049.009155, max=3699881930345059}
LongSummaryStatistics{count=11110761, sum=1580564487107336657, min=0, average=142255286303.731720, max=4980250345042342}
LongSummaryStatistics{count=11110842, sum=1291096623566986753, min=0, average=116201510521.613650, max=5990245950494212}
LongSummaryStatistics{count=11110683, sum=2140046523919170444, min=0, average=192611608478.000000, max=6999483760545061}
LongSummaryStatistics{count=11110881, sum=1629411286034487818, min=0, average=146650052865.698760, max=7993937378575107}
LongSummaryStatistics{count=11110718, sum=3892896980864594155, min=0, average=350373124478.957500, max=8993173987418912}
LongSummaryStatistics{count=11110795, sum=2930254808993867970, min=0, average=263730435940.350620, max=9996168394101800}
numPrefix | count | sum | min | average | max |
---|---|---|---|---|---|
0 | 1 | 0 | 0 | 0.000000 | 0 |
1 | 11110722 | 671067271105847657 | 0 | 60398169543.423700 | 1996215711700017 |
2 | 11110807 | 936118810008566710 | 0 | 84252998905.351040 | 2999558127899730 |
3 | 11110948 | 726950006539566180 | 0 | 65426461049.009155 | 3699881930345059 |
4 | 11110761 | 1580564487107336657 | 0 | 142255286303.731720 | 4980250345042342 |
5 | 11110842 | 1291096623566986753 | 0 | 116201510521.613650 | 5990245950494212 |
6 | 11110683 | 2140046523919170444 | 0 | 192611608478.000000 | 6999483760545061 |
7 | 11110881 | 1629411286034487818 | 0 | 146650052865.698760 | 7993937378575107 |
8 | 11110718 | 3892896980864594155 | 0 | 350373124478.957500 | 8993173987418912 |
9 | 11110795 | 2930254808993867970 | 0 | 263730435940.350620 | 9996168394101800 |
Now, the thing that immediately popped out to me was the max value for each column. That's MUCH too high. Some were reporting numbers in the quadrillions. And furthermore, everyone had a min of 0. And furthermore, none of the counts are the same. They should be the same, right?
So ok, we have a race condition. What I don't get, however, is why the StandardOpenOption.SYNC
did not protect me. Isn't that its job?
And if it's NOT its job, then what exactly is that option doing for me?
I didn't fully analyze your code, but i'm pretty sure you are misunderstanding what StandardOpenOption.SYNC
means. It has nothing to do with multi-threading. It has to do with persistence of the data when written to the filesystem.
Modern file operations lie to you. You say "write this to disk", and the system says "sure, done". In reality, however, the data was stuck in a buffer somewhere to be later written to the physical disk (because writing to the filesystem is relatively slow). I.e. the data will be written "asynchronously" to disk at some later point in time.
Setting the SYNC
option on file writes indicates that you really want the data written to the physical disk when you do the write operation. So, you say "write this to disk" and the system says "sure, done" and you say "no really, i'll wait" and the system says "fine ... ok, really done". (in practice this is slower, which is why it is not the default) I.e. the data is written "synchronously" to the disk during the write operation.
If you are depending on this to protect you while having multiple threads write to the same file, then you will not get the results you desire. You can use something like FileLock
to do actual file locking or you can use some sort of synchronization mechanism within java itself (my recommendation).