Search code examples
scalaapache-kafkaspark-streaminggreenplum

Data source io.pivotal.greenplum.spark.GreenplumRelationProvider does not support streamed writing


I am trying to read data from kafka and upload it into greenplum database using spark. i am using greenplum-spark connecter but i am getting Data source io.pivotal.greenplum.spark.GreenplumRelationProvider does not support streamed writing. Is it that greenplum source doesnot support streaming data? I can see on the website saying "Continuous ETL pipeline (streaming)".

I have tried giving datasource as "greenplum" and "io.pivotal.greenplum.spark.GreenplumRelationProvider" into .format("datasource")

val EventStream = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", args(0))
  .option("subscribe", args(1))
  .option("startingOffsets", "earliest")
  .option("failOnDataLoss", "false")
  .load

val gscWriteOptionMap = Map(
  "url" -> "link for greenplum",
  "user" -> "****",
  "password" -> "****",
  "dbschema" -> "dbname"
)
val stateEventDS = EventStream
  .selectExpr("CAST(key AS String)", "*****(value)")
  .as[(String, ******)]
  .map(_._2)

val EventOutputStream = stateEventDS.writeStream
  .format("io.pivotal.greenplum.spark.GreenplumRelationProvider")
  .options(gscWriteOptionMap)
  .start()

assetEventOutputStream.awaitTermination()

Solution

  • Greenplum Spark Structured Streaming

    Demonstrates how to use writeStream API with GPDB using JDBC

    The following code block reads using a rate stream source and uses the JDBC based sink to stream in batches to GPDB

    Batch based streaming

    import org.apache.spark.sql.streaming._
    import org.apache.spark.sql.streaming.Trigger
    import org.apache.spark.streaming.Seconds
    import org.apache.spark.streaming.StreamingContext
    
    import scala.concurrent.duration._
    
    val sq = spark.
      readStream.
      format("rate").
      load.
      writeStream.
      format("myjdbc").
      option("checkpointLocation", "/tmp/jdbc-checkpoint").
      trigger(Trigger.ProcessingTime(10.seconds)).
      start
    

    Record based streaming

    This uses the ForeachWriter

    import org.apache.spark.sql.streaming._
    import org.apache.spark.sql.streaming.Trigger
    import org.apache.spark.streaming.Seconds
    import org.apache.spark.streaming.StreamingContext
    
    import scala.concurrent.duration._
    
    val url="jdbc:postgresql://gsc-dev:5432/gpadmin"
    val user ="gpadmin"
    val pwd = "changeme"
    val jdbcWriter = new JDBCSink(url,user, pwd)
    
    val sq = spark.
      readStream.
      format("rate").
      load.
      writeStream.
      format(jdbcWriter).
      option("checkpointLocation", "/tmp/jdbc-checkpoint").
      trigger(Trigger.ProcessingTime(10.seconds)).
      start