Reading .tar.gz file as file stream in spark streaming is well and good enough. but my requirement is, can we read .tar.gz file that is coming from socketserver in spark-streaming?
when is post tar file in socket like
nc -l 9990 < input.tar.gz
i can read it in spark like
Dataset<org.apache.spark.sql.Row> dataset = ss.readStream()
.option("host", "localhost")
.option("port", "9990")
.format("socket")
.load();
if i try to store the result in csv format
dataset
.coalesce(1)
.writeStream()
.format("csv")
.outputMode(OutputMode.Append())
.option("checkpointLocation", "/tmp/checkpoint")
.option("path", "hdfs://10.15.xxx.xxx:8020/user/admin/sftp/tar_output4")
.start()
.awaitTermination();
i am getting output like binary format in output csv
0000000: 22 ef bf bd 08 00 00 00 00 00 00 00 ef bf bd ef "...............
0000010: bf bd 3b 0e ef bf bd 40 0c ef bf bd e1 ad 91 ef ..;....@........
0000020: bf bd 03 07 ef bf bd 18 ef bf bd 65 09 c7 a1 ef ...........e....
0000030: bf bd 41 01 ef bf bd 3a 7f 5c 22 ef bf bd 24 5b ..A....:.\"...$[
0000040: 21 ef bf bd 2a ef bf bd ef bf bd 35 1e ef bf bd !...*......5....
0000050: ef bf bd ef bf bd 56 5b 1b 1e 6d ef bf bd ef bf ......V[..m.....
0000060: bd ef bf bd ca a5 21 ef bf bd ef bf bd ef bf bd ......!.........
0000070: ef bf bd 0e d1 ac ef bf bd 77 47 36 0b ef bf bd .........wG6....
0000080: 77 ef bf bd 12 ef bf bd ef bf bd ef bf bd 2d 6b w.............-k
0000090: 37 3b ef bf bd 1f ef bf bd d7 95 3b ef bf bd 73 7;.........;...s
00000a0: 37 ef bf bd 4b 6d 59 ef bf bd 72 ef bf bd 3c ef 7...KmY...r...<.
00000b0: bf bd 04 ef bf bd ca b3 7f 17 02 00 00 00 00 00 ................
00000c0: 00 00 00 00 00 00 00 00 ef bf bd ef bf bd 06 59 ...............Y
00000d0: e9 a9 85 00 28 22 0d 0a ....("..
anyone please help me with this..
thanks
I wrote a TGZ codec and registered it. I use it for reading TGZ files as text/csv files. Maybe that helps:
final class DecompressTgzCodec extends CompressionCodec {
override def getDefaultExtension: String = ".tgz"
override def createOutputStream(out: OutputStream): CompressionOutputStream = ???
override def createOutputStream(out: OutputStream, compressor: Compressor): CompressionOutputStream = ???
override def createCompressor(): Compressor = ???
override def getCompressorType: Class[_ <: Compressor] = ???
override def createInputStream(in: InputStream): CompressionInputStream = {
new TarDecompressorStream(new TarArchiveInputStream(new GzipCompressorInputStream(in)))
}
override def createInputStream(in: InputStream, decompressor: Decompressor): CompressionInputStream = createInputStream(in)
// we do not create any decompressor
override def createDecompressor(): Decompressor = null
// we do not create any decompressor
override def getDecompressorType: Class[_ <: Decompressor] = null
// see https://www.conductor.com/nightlight/how-to-build-a-speedy-custom-compression-codec-for-hadoop/
final class TarDecompressorStream(in: TarArchiveInputStream) extends DecompressorStream(in) {
def updateStream(): Unit = {
// still have data in stream -> done
if (in.available() <= 0) {
// create stream content from following tar elements one by one
in.getNextTarEntry
}
}
override def read: Int = {
checkStream()
updateStream()
in.read()
}
override def read(b: Array[Byte], off: Int, len: Int): Int = {
checkStream()
updateStream()
in.read(b, off, len)
}
override def resetState(): Unit = {
}
}
}
...
// register the new codec:
val conf = new SparkConf()
conf.set("spark.hadoop.io.compression.codecs", classOf[DecompressTgzCodec].getName)
val spark = SparkSession
.builder()
.master(...)
.config(conf)
.appName("Test")
.getOrCreate()