I am trying to convert Json Data
{"col1":"sample-val-1", "col2":1.0}
{"col1":"sample-val-2", "col2":2.0}
{"col1":"sample-val-3", "col2":3.0}
{"col1":"sample-val-4", "col2":4.0}
{"col1":"sample-val-5", "col2":5.0}
and I need this to be converted in Parquet
then I wrote some code in Apache Beam
package org.apache.beam.examples;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import org.apache.avro.Schema;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.kitesdk.data.spi.JsonUtil;
import tech.allegro.schema.json2avro.converter.JsonAvroConverter;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
public class Main {
public static void main(String[] args) throws IOException {
Pipeline pipeLine = Pipeline.create();
PCollection<String> lines = pipeLine.apply("ReadMyFile", TextIO.read().from("path-to-file"));
File initialFile = new File("path-to-file");
InputStream targetStream = Files.newInputStream(initialFile.toPath());
Schema jsonSchema = JsonUtil.inferSchema(targetStream, "RecordName", 20);
System.out.println(jsonSchema.getDoc());
PCollection<String> words = lines.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Gson gson = new GsonBuilder().create();
JsonObject parsedMap = gson.fromJson(c.element(), JsonObject.class);
// out.output(parsedMap);
// System.out.println(Arrays.toString(parsedMap.toString().getBytes(StandardCharsets.UTF_8)));
JsonAvroConverter avroConverter = new JsonAvroConverter();
// GenericRecord record = avroConverter.convertToGenericDataRecord(parsedMap.toString().getBytes(), jsonSchema);
// context.output(record);
}
}));
pipeLine.run();
//
// pgr.apply(FileIO.<GenericRecord>write().via(ParquetIO.sink(schema)).to("path/to/save"));
}
}
I am able to get line by line json but unable to convert it to Parquet the above code throws error if you try to convert the Json to Parquet using
GenericRecord record = avroConverter.convertToGenericDataRecord(parsedMap.toString().getBytes(), jsonSchema);
error due to this line
Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
... 25 more
I created a new class and added as parameter construct and created it in @Setup.
Schema jsonSchema = new Schema.Parser().parse(schemaString);
pipeLine.apply("ReadMyFile", TextIO.read().from(options.getInput()))
.apply("Convert Json To General Record", ParDo.of(new JsonToGeneralRecord(jsonSchema)))
.setCoder(AvroCoder.of(GenericRecord.class, jsonSchema))
private static final Logger logger = LogManager.getLogger(JsonToGeneralRecord.class);
private final String schemaString;
private Schema jsonSchema;
// constructor
JsonToGeneralRecord(Schema schema) {
schemaString = schema.toString();
}
@Setup
public void setup() {
jsonSchema = new Schema.Parser().parse(schemaString);
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Gson gson = new GsonBuilder().create();
JsonObject parsedMap = gson.fromJson(c.element(), JsonObject.class);
logger.info("successful: " + parsedMap.toString());
JsonAvroConverter avroConverter = new JsonAvroConverter();
try {
GenericRecord record = avroConverter.convertToGenericDataRecord(parsedMap.toString().getBytes(), jsonSchema);
c.output(record);
} catch (Exception e) {
logger.error("error: " + e.getMessage() + parsedMap);
e.printStackTrace();
}
}
}```