Search code examples
scalaapache-sparkmqttwatson-iotapache-bahir

Schema lost with ApacheBahir Stuctured Streaming connector on ApacheSpark streaming


I'm trying to hook-up an ApacheSpark Structured Stream to a MQTT topic (IBM Watson IoT Platform on IBM Bluemix in this case).

I'm creating the structured stream as follows:

val df = spark.readStream 
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    .option("username","a-vy0z2s-q6s8r693hv")
    .option("password","B+UX(aWuFPvX")
    .option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf")
    .option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
    .load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")

So far so good, in REPL I get back this df object as follows:

df: org.apache.spark.sql.DataFrame = [value: string, timestamp: timestamp]

I've learned from this thread that I have to change the client ID every time I connect. So this is solved, but if I start to read from the stream using this line:

val query = df.writeStream. outputMode("append").
format("console").start()

Then the resulting schema looks like this:

df: org.apache.spark.sql.DataFrame = [value: string, timestamp: timestamp]

And the data as follows:

enter image description here

This means my JSON stream is converted into a stream of string object containing the JSON representation.

Is this a limitation of ApacheBahir?

Also providing a schema doesn't help since the following code resembles into the same result:

import org.apache.spark.sql.types._
val schema = StructType(
    StructField("count",LongType,true)::
    StructField("flowrate",LongType,true)::
    StructField("fluidlevel",StringType,true)::
    StructField("frequency",LongType,true)::
    StructField("hardness",LongType,true)::
    StructField("speed",LongType,true)::
    StructField("temperature",LongType,true)::
    StructField("ts",LongType,true)::
    StructField("voltage",LongType,true)::
Nil)

:paste
val df = spark.readStream
    .schema(schema)
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    .option("username","a-vy0z2s-q6s8r693hv")
    .option("password","B+UX(a8GFPvX")
    .option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf4")
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
    .load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")

Solution

  • Many DataSources, including, but not limited to MQTTStreamSource, have fixed schema, which consist of a message and a timestamp. Schema is not lost, is simply not parsed and it is an expected behavior.

    If schema is fixed and known up front you should be able to use from_json function:

    import org.apache.spark.sql.functions.from_json
    
    df.withColumn("value", from_json($"value", schema))