Search code examples
javaapache-beamairflow-2.x

Comparing 2 csv files using apache beam java


I have a task of comparing 2 csv files using Apache Beam Java.

I have to compare the Ids in file 1 with the all Ids in File 2 and if it matches then I have to get the whole line which consists of that Ids and save it in a file. For e.g.: Id is found in 1st reference image. Hence I would copy this line to another file and save it. if the ID would not have found then it would search on the next line .. till the end of file.

The code which I have come up with is as per below:

public class CsvComparer {
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create();

        // Read the first CSV file
        PCollection<String> file1 = pipeline.apply(TextIO.read().from("gs://my-bucket/file1.csv"));

        // Read the second CSV file
        PCollection<String> file2 = pipeline.apply(TextIO.read().from("gs://my-bucket/file2.csv"));

        // Compare the data in the files and write the results to a file
        PCollection<String> results = file1.apply(ParDo.of(new CompareCsvFn(file2)));
        results.apply(TextIO.write().to("gs://my-bucket/comparison_results.csv").withSuffix(".csv"));

        pipeline.run().waitUntilFinish();
    } 

    public static class CompareCsvFn extends DoFn<String, String> {
        private final PCollection<String> file2; 

        public CompareCsvFn(PCollection<String> file2) {
            this.file2 = file2;
        } 

        @ProcessElement
        public void processElement(@Element String line, OutputReceiver<String> out) {
            // Compare the line with the lines in file2
            PCollection<KV<String, String>> matches = file2.apply(ParDo.of(new CompareLinesFn(line))); 

            // Write the matching lines to the output
            matches.apply(ParDo.of(new FormatOutputFn(line))).apply(out);
        }
    } 

    public static class CompareLinesFn extends DoFn<String, KV<String, String>> {
        private final String line; 

        public CompareLinesFn(String line) {
            this.line = line;
        } 

        @ProcessElement
        public void processElement(@Element String otherLine, OutputReceiver<KV<String, String>> out) {
            // Compare the lines and output a KV pair if they match
// this logic would need to be changed I understand as I have to compare one line in file1 with all other lines in file2 
            if (line.equals(otherLine)) {
                out.output(KV.of(line, otherLine));
            }
        }
    } 

    public static class FormatOutputFn extends DoFn<KV<String, String>, String> {
        private final String line; 

        public FormatOutputFn(String line) {
            this.line = line;
        } 

        @ProcessElement
        public void processElement(@Element KV<String, String> match, OutputReceiver<String> out) {
            // Format the output
            String output = String.format("Line in file 1: %s\nLine in file 2: %s\n", line, match.getValue());
            out.output(output);
        }
    }
}

Please kindly help me and advise on the above. I have search the net and other areas but to no avail.


Solution

  • I think that a few changes in the code can fix it

    Try this code :

       public class CsvComparer {
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create();
    
        // Read the first CSV file as KV pairs (ID, Line)
        PCollection<KV<String, String>> file1 = pipeline.apply(TextIO.read().from("gs://my-bucket/file1.csv"))
                .apply(ParDo.of(new ExtractKeyValueFn()));
    
        // Read the second CSV file as KV pairs (ID, Line)
        PCollection<KV<String, String>> file2 = pipeline.apply(TextIO.read().from("gs://my-bucket/file2.csv"))
                .apply(ParDo.of(new ExtractKeyValueFn()));
    
        // CoGroupByKey to join lines with the same ID
        PCollection<KV<String, CoGbkResult>> groupedLines = KeyedPCollectionTuple.of("file1", file1)
                .and("file2", file2)
                .apply(CoGroupByKey.create());
    
        // Compare the data in the files and write the results to a file
        PCollection<String> results = groupedLines.apply(ParDo.of(new CompareCsvFn()));
        results.apply(TextIO.write().to("gs://my-bucket/comparison_results.csv").withSuffix(".csv"));
    
        pipeline.run().waitUntilFinish();
    }
    
    public static class ExtractKeyValueFn extends DoFn<String, KV<String, String>> {
        @ProcessElement
        public void processElement(@Element String line, OutputReceiver<KV<String, String>> out) {
            // Split the line by ',' and use the first part as the key (ID) and the whole line as the value
            String[] parts = line.split(",");
            String key = parts[0].substring(parts[0].lastIndexOf('/') + 1);
            out.output(KV.of(key, line));
        }
    }
    
    public static class CompareCsvFn extends DoFn<KV<String, CoGbkResult>, String> {
        @ProcessElement
        public void processElement(@Element KV<String, CoGbkResult> element, OutputReceiver<String> out) {
            String id = element.getKey();
            Iterable<String> linesFromFirstFile = element.getValue().getAll("file1");
            Iterable<String> linesFromSecondFile = element.getValue().getAll("file2");
    
            // Compare the lines for the given ID and output if they match
            for (String line1 : linesFromFirstFile) {
                for (String line2 : linesFromSecondFile) {
                    if (line1.equals(line2)) {
                        // Format the output
                        String output = String.format("Line in file 1: %s\nLine in file 2: %s\n", line1, line2);
                        out.output(output);
                    }
                }
            }
        }
    }
    }
    

    I changed a few things in it ,let me know that it works or not.