Search code examples
python-3.xapache-sparkjdbcpysparkspark-structured-streaming

how to connect and writestream the postgres jdbc in my spark 2.4.7?


df5.writeStream.format("jdbc").option("url", "url")\
  .option("dbtable","test").option("user","postgres")\
  .option("password", "password").start()

always getting the

Py4JJavaError: An error occurred while calling o112.start.
: java.lang.UnsupportedOperationException: Data source jdbc does not support streamed writing
    at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:311)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:322)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Unknown Source)

Solution

  • The error message clearly says the reason for the error: Data source jdbc does not support streamed writing

    So if you want to write data to JDBC sink from the structured streaming, you need to use foreachBatch instead, something like this:

    def foreach_batch_function(df, epoch_id):
        df.format("jdbc").option("url", "url")\
          .option("dbtable","test").option("user","postgres")\
          .option("password", "password").save()
      
    df5.writeStream.foreachBatch(foreach_batch_function).start()