Loading in the data:
SparkConf sc= new SparkConf().setAppName("TEST").setMaster("local[*]");
JavaSparkContext JSC = new JavaSparkContext(sc);
JavaRDD<String> stringRDDVotes = JSC.textFile("HarryPotter.csv");
I currently have this table loaded into an RDD:
ID | A | B | Name |
---|---|---|---|
1 | 23 | 50 | Harry;Potter |
I want to convert it to the table below:
ID | A | B | Name |
---|---|---|---|
1 | 23 | 50 | Harry |
1 | 23 | 50 | Potter |
All the solutions I found use SparkSQL which I can't use, so how would I get this result using only things like flatMap
and mapToPair
.
Something like this maybe?
flatMap(s -> Arrays.asList(s.split(";")).iterator())
The code above produces this:
ID | A | B | Name |
---|---|---|---|
1 | 23 | 50 | Harry |
Potter |
I know that in scala it can be done like this, but I don't know how to it with java:
val input: RDD[String] = sc.parallelize(Seq("1,23,50,Harry;Potter"))
val csv: RDD[Array[String]] = input.map(_.split(','))
val result = csv.flatMap { case Array(s1, s2, s3, s4) => s4.split(";").map(part => (s1, s2, s3, part)) }
The first part is quite simple to convert from Scala to Java, you only need to use map
to split each line by comma to get a JavaRDD<String[]>
. Then using flatMap
, for each row, split the last part of the array corresponding to Name
, and using java streams, you can transform each element of the names list into a new list.
Here is a complete example:
JavaRDD<String> input = JSC.parallelize(
Arrays.asList("1,23,50,Harry;Potter", "2,24,60,Hermione;Granger")
);
JavaRDD<String[]> result = input.map(line -> line.split(","))
.flatMap(r -> {
List<String> names = Arrays.asList(r[3].split(";"));
String[][] values = names.stream()
.map(name -> new String[]{r[0], r[1], r[2], name})
.toArray(String[][]::new);
return Arrays.asList(values).iterator();
});
// print the result RDD
for (String[] line : result.collect()) {
System.out.println(Arrays.toString(line));
}
// [1, 23, 50, Harry]
// [1, 23, 50, Potter]
// [2, 24, 60, Hermione]
// [2, 24, 60, Granger]