Search code examples
apache-sparkspark-structured-streaming

How to create a custom streaming data source?


I have a custom reader for Spark Streaming that reads data from WebSocket. I'm going to try Spark Structured Streaming.

How to create a streaming data source in Spark Structured Streaming?


Solution

  • A streaming data source implements org.apache.spark.sql.execution.streaming.Source.

    The scaladoc of org.apache.spark.sql.execution.streaming.Source should give you enough information to get started (just follow the types to develop a compilable Scala type).

    Once you have the Source you have to register it so you can use it in format of a DataStreamReader. The trick to make the streaming source available so you can use it for format is to register it by creating the DataSourceRegister for the streaming source. You can find examples in META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:

    org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
    org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
    org.apache.spark.sql.execution.datasources.json.JsonFileFormat
    org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
    org.apache.spark.sql.execution.datasources.text.TextFileFormat
    org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
    org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
    org.apache.spark.sql.execution.streaming.RateSourceProvider
    

    That's the file that links the short name in format to the implementation.

    What I usually recommend people doing during my Spark workshops is to start development from both sides:

    1. Write the streaming query (with format), e.g.

      val input = spark
        .readStream
        .format("yourCustomSource") // <-- your custom source here
        .load
      
    2. Implement the streaming Source and a corresponding DataSourceRegister (it could be the same class)

    3. (optional) Register the DataSourceRegister by writing the fully-qualified class name, say com.mycompany.spark.MyDataSourceRegister, to META-INF/services/org.apache.spark.sql.sources.DataSourceRegister:

      $ cat META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
      com.mycompany.spark.MyDataSourceRegister
      

    The last step where you register the DataSourceRegister implementation for your custom Source is optional and is only to register the data source alias that your end users use in DataFrameReader.format method.

    format(source: String): DataFrameReader Specifies the input data source format.

    Review the code of org.apache.spark.sql.execution.streaming.RateSourceProvider for a good head start.