Search code examples
apache-sparkelasticsearchapache-spark-sqlspark-structured-streaming

How to enrich data of a streaming query and write the result to Elasticsearch?


For a given dataset (originalData) I'm required to map the values and then prepare a new dataset combining the search results from elasticsearch.

Dataset<Row> orignalData = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers","test")
  .option("subscribe", "test")
  .option("startingOffsets", "latest")
  .load();

Dataset<Row> esData = JavaEsSparkSQL
  .esDF(spark.sqlContext(), "spark_correlation/doc");

esData.createOrReplaceTempView("es_correlation");
List<SGEvent> listSGEvent = new ArrayList<>();

originalData.foreach((ForeachFunction<Row>) row -> {
 SGEvent event = new SGEvent();
 String sourceKey=row.get(4).toString();
 String searchQuery = "select id from es_correlation where es_correlation.key='"+sourceKey+"'";
 Dataset<Row> result = spark.sqlContext().sql(searchQuery);
 String id = null;
 if (result != null) {
    result.show();
    id = result.first().toString();
  }
 event.setId(id);
 event.setKey(sourceKey);
 listSGEvent.add(event)
}
Encoder<SGEvent> eventEncoderSG = Encoders.bean(SGEvent.class);
Dataset<Row> finalData = spark.createDataset(listSGEvent, eventEncoderSG).toDF();

finalData
  .writeStream()
  .outputMode(OutputMode.Append())
  .format("org.elasticsearch.spark.sql")
  .option("es.mapping.id", "id")
  .option("es.write.operation", "upsert")
  .option("checkpointLocation","/tmp/checkpoint/sg_event")
  .start("spark_index/doc").awaitTermination();

Spark throws the following exception:

org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:38)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:126)
  at scala.collection.immutable.List.foreach(List.scala:392)

Is my approach towards combing elasticsearch value with Dataset valid ? Is there any other better solution for this?


Solution

  • There are a couple of issues here.

    As the exception says orignalData is a streaming query (streaming Dataset) and the only way to execute it is to use writeStream.start(). That's one issue.

    You did writeStream.start() but with another query finalData which is not streaming but batch. That's another issue.

    For "enrichment" cases like yours, you can use a streaming join (Dataset.join operator) or one of DataStreamWriter.foreach and DataStreamWriter.foreachBatch. I think DataStreamWriter.foreachBatch would be more efficient.

    public DataStreamWriter<T> foreachBatch(VoidFunction2<Dataset<T>,Long> function)

    (Java-specific) Sets the output of the streaming query to be processed using the provided function. This is supported only the in the micro-batch execution modes (that is, when the trigger is not continuous). In every micro-batch, the provided function will be called in every micro-batch with (i) the output rows as a Dataset and (ii) the batch identifier. The batchId can be used deduplicate and transactionally write the output (that is, the provided Dataset) to external systems. The output Dataset is guaranteed to exactly same for the same batchId (assuming all operations are deterministic in the query).

    Not only would you get all the data of a streaming micro-batch in one shot (the first input argument of type Dataset<T>), but also a way to submit another Spark job (across executors) based on the data.

    The pseudo-code could look as follows (I'm using Scala as I'm more comfortable with the language):

    val dsWriter = originalData.foreachBatch { case (data, batchId) =>
      // make sure the data is small enough to collect on the driver
      // Otherwise expect OOME
      // It'd also be nice to have a Java bean to convert the rows to proper types and names
      val localData = data.collect
    
      // Please note that localData is no longer Spark's Dataset
      // It's a local Java collection
    
      // Use Java Collection API to work with the localData
      // e.g. using Scala
      // You're mapping over localData (for a single micro-batch)
      // And creating finalData
      // I'm using the same names as your code to be as close to your initial idea as possible
      val finalData = localData.map { row =>
        // row is the old row from your original code
        // do something with it
        // e.g. using Java
        String sourceKey=row.get(4).toString();
        ...
      }
    
      // Time to save the data processed to ES
      // finalData is a local Java/Scala collection not Spark's DataFrame!
      // Let's convert it to a DataFrame (and leverage the Spark distributed platform)
    
      // Note that I'm almost using your code, but it's a batch query not a streaming one
      // We're inside foreachBatch
      finalData
        .toDF // Convert a local collection to a Spark DataFrame
        .write  // this creates a batch query
        .format("org.elasticsearch.spark.sql")
        .option("es.mapping.id", "id")
        .option("es.write.operation", "upsert")
        .option("checkpointLocation","/tmp/checkpoint/sg_event")
        .save("spark_index/doc") // save (not start) as it's a batch query inside a streaming query
    }
    

    dsWriter is a DataStreamWriter and you can now start it to start the streaming query.