I have a task of comparing 2 csv files using Apache Beam Java.
I have to compare the Ids in file 1 with the all Ids in File 2 and if it matches then I have to get the whole line which consists of that Ids and save it in a file. For e.g.: Id is found in 1st reference image. Hence I would copy this line to another file and save it. if the ID would not have found then it would search on the next line .. till the end of file.
The code which I have come up with is as per below:
public class CsvComparer {
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create();
// Read the first CSV file
PCollection<String> file1 = pipeline.apply(TextIO.read().from("gs://my-bucket/file1.csv"));
// Read the second CSV file
PCollection<String> file2 = pipeline.apply(TextIO.read().from("gs://my-bucket/file2.csv"));
// Compare the data in the files and write the results to a file
PCollection<String> results = file1.apply(ParDo.of(new CompareCsvFn(file2)));
results.apply(TextIO.write().to("gs://my-bucket/comparison_results.csv").withSuffix(".csv"));
pipeline.run().waitUntilFinish();
}
public static class CompareCsvFn extends DoFn<String, String> {
private final PCollection<String> file2;
public CompareCsvFn(PCollection<String> file2) {
this.file2 = file2;
}
@ProcessElement
public void processElement(@Element String line, OutputReceiver<String> out) {
// Compare the line with the lines in file2
PCollection<KV<String, String>> matches = file2.apply(ParDo.of(new CompareLinesFn(line)));
// Write the matching lines to the output
matches.apply(ParDo.of(new FormatOutputFn(line))).apply(out);
}
}
public static class CompareLinesFn extends DoFn<String, KV<String, String>> {
private final String line;
public CompareLinesFn(String line) {
this.line = line;
}
@ProcessElement
public void processElement(@Element String otherLine, OutputReceiver<KV<String, String>> out) {
// Compare the lines and output a KV pair if they match
// this logic would need to be changed I understand as I have to compare one line in file1 with all other lines in file2
if (line.equals(otherLine)) {
out.output(KV.of(line, otherLine));
}
}
}
public static class FormatOutputFn extends DoFn<KV<String, String>, String> {
private final String line;
public FormatOutputFn(String line) {
this.line = line;
}
@ProcessElement
public void processElement(@Element KV<String, String> match, OutputReceiver<String> out) {
// Format the output
String output = String.format("Line in file 1: %s\nLine in file 2: %s\n", line, match.getValue());
out.output(output);
}
}
}
Please kindly help me and advise on the above. I have search the net and other areas but to no avail.
I think that a few changes in the code can fix it
Try this code :
public class CsvComparer {
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create();
// Read the first CSV file as KV pairs (ID, Line)
PCollection<KV<String, String>> file1 = pipeline.apply(TextIO.read().from("gs://my-bucket/file1.csv"))
.apply(ParDo.of(new ExtractKeyValueFn()));
// Read the second CSV file as KV pairs (ID, Line)
PCollection<KV<String, String>> file2 = pipeline.apply(TextIO.read().from("gs://my-bucket/file2.csv"))
.apply(ParDo.of(new ExtractKeyValueFn()));
// CoGroupByKey to join lines with the same ID
PCollection<KV<String, CoGbkResult>> groupedLines = KeyedPCollectionTuple.of("file1", file1)
.and("file2", file2)
.apply(CoGroupByKey.create());
// Compare the data in the files and write the results to a file
PCollection<String> results = groupedLines.apply(ParDo.of(new CompareCsvFn()));
results.apply(TextIO.write().to("gs://my-bucket/comparison_results.csv").withSuffix(".csv"));
pipeline.run().waitUntilFinish();
}
public static class ExtractKeyValueFn extends DoFn<String, KV<String, String>> {
@ProcessElement
public void processElement(@Element String line, OutputReceiver<KV<String, String>> out) {
// Split the line by ',' and use the first part as the key (ID) and the whole line as the value
String[] parts = line.split(",");
String key = parts[0].substring(parts[0].lastIndexOf('/') + 1);
out.output(KV.of(key, line));
}
}
public static class CompareCsvFn extends DoFn<KV<String, CoGbkResult>, String> {
@ProcessElement
public void processElement(@Element KV<String, CoGbkResult> element, OutputReceiver<String> out) {
String id = element.getKey();
Iterable<String> linesFromFirstFile = element.getValue().getAll("file1");
Iterable<String> linesFromSecondFile = element.getValue().getAll("file2");
// Compare the lines for the given ID and output if they match
for (String line1 : linesFromFirstFile) {
for (String line2 : linesFromSecondFile) {
if (line1.equals(line2)) {
// Format the output
String output = String.format("Line in file 1: %s\nLine in file 2: %s\n", line1, line2);
out.output(output);
}
}
}
}
}
}
I changed a few things in it ,let me know that it works or not.