In the following CSV, I need to append new row values for it.
ID | date | balance |
---|---|---|
01 | 31/01/2021 | 100 |
01 | 28/02/2021 | 200 |
01 | 31/03/2021 | 200 |
01 | 30/04/2021 | 200 |
01 | 31/05/2021 | 500 |
01 | 30/06/2021 | 600 |
Expected output:
ID | date | balance |
---|---|---|
01 | 31/01/2021 | 100 |
01 | 28/02/2021 | 200 |
01 | 31/03/2021 | 200 |
01 | 30/04/2021 | 200 |
01 | 31/05/2021 | 500 |
01 | 30/06/2021 | 600 |
01 | 30/07/2021 | 999 |
Java code:
public static void main(String[] args) throws IOException {
final File schemaFile = new File("src/main/resources/addRow/schema_transform.avsc");
File csvFile = new File("src/main/resources/addRow/CustomerRequest.csv");
Schema schema = new Schema.Parser().parse(schemaFile);
Pipeline pipeline = Pipeline.create();
// Reading schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
final PCollectionTuple tuples = pipeline
// Reading csv input
.apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
// Reading files that matches conditions //PRashanth needs to be looked at
.apply("2", FileIO.readMatches())
// Reading schema and validating with schema and converts to row and returns
// valid and invalid list
.apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
TupleTagList.of(invalidTag())));
// Fetching only valid rows
final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
RowAddition rowAddition = new RowAddition();
final PCollection<Row> newlyAddedRows = rows.apply(ParDo.of(rowAddition)).setCoder(RowCoder.of(beamSchema));
;
How to combine these two PCollection objects?
PCollection<String> pOutput = newlyAddedRows.apply(ParDo.of(new RowToString()));
pOutput.apply(TextIO.write().to("src/main/resources/addRow/rowOutput").withNumShards(1).withSuffix(".csv"));
pipeline.run().waitUntilFinish();
System.out.println("The end");
}
}
Logic for adding rows
class RowAddition extends DoFn<Row, Row> {
private static final long serialVersionUID = -8093837716944809689L;
@ProcessElement
public void processElement(ProcessContext context) {
org.apache.beam.sdk.schemas.Schema beamSchema=null;
try {
beamSchema = AvroUtils.toBeamSchema(new Schema.Parser().parse(new File("src/main/resources/addRow/schema_transform.avsc")));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Row row = context.element();
Row newRow = row.withSchema(beamSchema).addValues("01", "30/7/2021", 999.0).build();
context.output(newRow);
}
}
I have referring this link
You're looking for the Flatten transform. This takes any number of existing PCollections and produces a new PCollection with the union of their elements. For completely new elements, you could use Create or use another PTransform to compute the new elements based on the old ones.