I've recently started working with apache spark and came across a requirement where I need to read kafka stream and feed the data in cassandra. While doing so I encountered an issue where as streams are SQL based and cassandra connector is on rdd (I may be wrong here please do correct me) I was struggling to get this working. Somehow I made it work as of now but not sure if that's the true way to implement.
Below is the code
Schema
StructType getSchema(){
StructField[] structFields = new StructField[]{
new StructField("id", DataTypes.LongType, true, Metadata.empty()),
new StructField("name", DataTypes.StringType, true, Metadata.empty()),
new StructField("cat", DataTypes.StringType, true, Metadata.empty()),
new StructField("tag", DataTypes.createArrayType(DataTypes.StringType), true, Metadata.empty())
};
return new StructType(structFields);
}
stream reader
Dataset<Row> results = kafkaDataset.select(
col("key").cast("string"),
from_json(col("value").cast("string"), getSchema()).as("value"),
col("topic"),
col("partition"),
col("offset"),
col("timestamp"),
col("timestampType"));
results.select("value.*")
.writeStream()
.foreachBatch(new VoidFunction2<Dataset<Row>, Long>() {
@Override
public void call(Dataset<Row> dataset, Long batchId) throws Exception {
ObjectMapper mapper = new ObjectMapper();
List<DealFeedSchema> list = new ArrayList<>();
List<Row> rowList = dataset.collectAsList();
if (!rowList.isEmpty()) {
rowList.forEach(row -> {
if (row == null) logger.info("Null DataSet");
else {
try {
list.add(mapper.readValue(row.json(), DealFeedSchema.class));
} catch (JsonProcessingException e) {
logger.error("error parsing Data", e);
}
}
});
JavaRDD<DealFeedSchema> rdd = new JavaSparkContext(session.sparkContext()).parallelize(list);
javaFunctions(rdd).writerBuilder(Constants.CASSANDRA_KEY_SPACE,
Constants.CASSANDRA_DEAL_TABLE_SPACE, mapToRow(DealFeedSchema.class)).saveToCassandra();
}
}
}).
start().awaitTermination();
although this works fine i need to know if theres a better way to do this if there is any please let me know how to acheive it.
Thanks in advance. for those who are looking for a way you can refer this code as an alternative.. :)
1. Java Bean for DealFeedSchema
import java.util.List;
public class DealFeedSchema {
private long id;
private String name;
private String cat;
private List<String> tag;
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getCat() {
return cat;
}
public void setCat(String cat) {
this.cat = cat;
}
public List<String> getTag() {
return tag;
}
public void setTag(List<String> tag) {
this.tag = tag;
}
}
2. Load the test data
Dataset<Row> dataFrame = spark.createDataFrame(Arrays.asList(
RowFactory.create(1L, "foo", "cat1", Arrays.asList("tag1", "tag2"))
), getSchema());
dataFrame.show(false);
dataFrame.printSchema();
/**
* +---+----+----+------------+
* |id |name|cat |tag |
* +---+----+----+------------+
* |1 |foo |cat1|[tag1, tag2]|
* +---+----+----+------------+
*
* root
* |-- id: long (nullable = true)
* |-- name: string (nullable = true)
* |-- cat: string (nullable = true)
* |-- tag: array (nullable = true)
* | |-- element: string (containsNull = true)
*/
3. Convert Dataset<Row> to Dataset<DealFeedSchema>
Dataset<DealFeedSchema> dealFeedSchemaDataset = dataFrame.as(Encoders.bean(DealFeedSchema.class));
dealFeedSchemaDataset.show(false);
dealFeedSchemaDataset.printSchema();
/**
* +---+----+----+------------+
* |id |name|cat |tag |
* +---+----+----+------------+
* |1 |foo |cat1|[tag1, tag2]|
* +---+----+----+------------+
*
* root
* |-- id: long (nullable = true)
* |-- name: string (nullable = true)
* |-- cat: string (nullable = true)
* |-- tag: array (nullable = true)
* | |-- element: string (containsNull = true)
*/