Search code examples
javaapache-beamapache-beam-ioapache-beam-internals

How to append new rows or perform union on tow PCollection


In the following CSV, I need to append new row values for it.

ID date balance
01 31/01/2021 100
01 28/02/2021 200
01 31/03/2021 200
01 30/04/2021 200
01 31/05/2021 500
01 30/06/2021 600

Expected output:

ID date balance
01 31/01/2021 100
01 28/02/2021 200
01 31/03/2021 200
01 30/04/2021 200
01 31/05/2021 500
01 30/06/2021 600
01 30/07/2021 999

Java code:

    public static void main(String[] args) throws IOException {
        final File schemaFile = new File("src/main/resources/addRow/schema_transform.avsc");

        File csvFile = new File("src/main/resources/addRow/CustomerRequest.csv");

        Schema schema = new Schema.Parser().parse(schemaFile);

        Pipeline pipeline = Pipeline.create();

        // Reading schema
        org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

        final PCollectionTuple tuples = pipeline

                // Reading csv input
                .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

                // Reading files that matches conditions //PRashanth needs to be looked at
                .apply("2", FileIO.readMatches())

                // Reading schema and validating with schema and converts to row and returns
                // valid and invalid list
                .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                        TupleTagList.of(invalidTag())));

        // Fetching only valid rows

        final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
        RowAddition rowAddition = new RowAddition();
        final PCollection<Row> newlyAddedRows = rows.apply(ParDo.of(rowAddition)).setCoder(RowCoder.of(beamSchema));
        ;

How to combine these two PCollection objects?

        PCollection<String> pOutput = newlyAddedRows.apply(ParDo.of(new RowToString()));
        pOutput.apply(TextIO.write().to("src/main/resources/addRow/rowOutput").withNumShards(1).withSuffix(".csv"));

        pipeline.run().waitUntilFinish();
        System.out.println("The end");
    }
}

Logic for adding rows

class RowAddition extends DoFn<Row, Row> {

    private static final long serialVersionUID = -8093837716944809689L;

    @ProcessElement
    public void processElement(ProcessContext context) {
        org.apache.beam.sdk.schemas.Schema beamSchema=null;
        try {
            beamSchema = AvroUtils.toBeamSchema(new Schema.Parser().parse(new File("src/main/resources/addRow/schema_transform.avsc")));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        Row row = context.element();
        Row newRow = row.withSchema(beamSchema).addValues("01", "30/7/2021", 999.0).build();
        context.output(newRow);
    }
}

I have referring this link

https://beam.apache.org/documentation/pipelines/design-your-pipeline/#:~:text=Merging%20PCollections,-Often%2C%20after%20you&text=You%20can%20do%20so%20by,join%20between%20two%20PCollection%20s.


Solution

  • You're looking for the Flatten transform. This takes any number of existing PCollections and produces a new PCollection with the union of their elements. For completely new elements, you could use Create or use another PTransform to compute the new elements based on the old ones.