I am trying to read data from kafka and upload it into greenplum database using spark. i am using greenplum-spark connecter but i am getting Data source io.pivotal.greenplum.spark.GreenplumRelationProvider does not support streamed writing. Is it that greenplum source doesnot support streaming data? I can see on the website saying "Continuous ETL pipeline (streaming)".
I have tried giving datasource as "greenplum" and "io.pivotal.greenplum.spark.GreenplumRelationProvider" into .format("datasource")
val EventStream = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", args(0))
.option("subscribe", args(1))
.option("startingOffsets", "earliest")
.option("failOnDataLoss", "false")
.load
val gscWriteOptionMap = Map(
"url" -> "link for greenplum",
"user" -> "****",
"password" -> "****",
"dbschema" -> "dbname"
)
val stateEventDS = EventStream
.selectExpr("CAST(key AS String)", "*****(value)")
.as[(String, ******)]
.map(_._2)
val EventOutputStream = stateEventDS.writeStream
.format("io.pivotal.greenplum.spark.GreenplumRelationProvider")
.options(gscWriteOptionMap)
.start()
assetEventOutputStream.awaitTermination()
Demonstrates how to use writeStream API with GPDB using JDBC
The following code block reads using a rate stream source and uses the JDBC based sink to stream in batches to GPDB
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import scala.concurrent.duration._
val sq = spark.
readStream.
format("rate").
load.
writeStream.
format("myjdbc").
option("checkpointLocation", "/tmp/jdbc-checkpoint").
trigger(Trigger.ProcessingTime(10.seconds)).
start
This uses the ForeachWriter
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import scala.concurrent.duration._
val url="jdbc:postgresql://gsc-dev:5432/gpadmin"
val user ="gpadmin"
val pwd = "changeme"
val jdbcWriter = new JDBCSink(url,user, pwd)
val sq = spark.
readStream.
format("rate").
load.
writeStream.
format(jdbcWriter).
option("checkpointLocation", "/tmp/jdbc-checkpoint").
trigger(Trigger.ProcessingTime(10.seconds)).
start