Given a dataframe that I extracted from Kafka. How can I extract values from it via pattern matching?
Dataframe:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.option("startingOffsets", "earliest") \
.load()
My issue is the schema looks like this:
df.printSchema()
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
That binary type is something that I can't pattern match for. How would I extract that value then parse it?
Question : How would I extract that value then parse it?
I assume that you are using avro message and can try as per below code snippet ( I dont know what you are trying to pattern match here) decodeAndParseObject
function uses twitters bijection api
with below dependency
<!-- https://mvnrepository.com/artifact/com.twitter/bijection-avro -->
<dependency>
<groupId>com.twitter</groupId>
<artifactId>bijection-avro_2.10</artifactId>
<version>0.7.0</version>
</dependency>
val ds = df.select("value").as[Array[Byte]].map(x=>decodeAndParseObject(x))
where
import org.apache.avro.generic.GenericRecord
import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
/**
* decode and parse binary based on your schema... your logic goes here
*/
def decodeAndParseObject(message: Array[Byte]) = {
val schema = new Schema.Parser().parse("yourschemahere")
val recordInjection: Injection[GenericRecord, Array[Byte]] =
GenericAvroCodecs.toBinary(schema)
val record: GenericRecord = recordInjection.invert(message).get
println(record.getSchema)
record.getSchema.getFields.toArray().foreach(println)
println("\n\n\n\n\n\n Record " + record.toString.replaceAll(",", "\n"))
//get the column and do pattern matching....
// Prepare another generic record .... I'm leaving it as blank here...
record
}
Update:
you can use the above generic record and get the column you want using
record.get("yourcolumn")
and do scala pattern matching case for this.