Search code examples
apache-sparkapache-spark-sqltarserversocket

Read .tar.gz file in spark socket dataset


Reading .tar.gz file as file stream in spark streaming is well and good enough. but my requirement is, can we read .tar.gz file that is coming from socketserver in spark-streaming?

when is post tar file in socket like

nc -l 9990 < input.tar.gz

i can read it in spark like

Dataset<org.apache.spark.sql.Row> dataset = ss.readStream()
                     .option("host", "localhost")
                     .option("port", "9990")
                     .format("socket")
                     .load();

if i try to store the result in csv format

dataset
               .coalesce(1)
               .writeStream()
               .format("csv")
               .outputMode(OutputMode.Append())
               .option("checkpointLocation", "/tmp/checkpoint")
               .option("path", "hdfs://10.15.xxx.xxx:8020/user/admin/sftp/tar_output4")
               .start()
               .awaitTermination();

i am getting output like binary format in output csv

0000000:    22 ef bf bd 08 00 00 00 00 00 00 00 ef bf bd ef   "...............
0000010:    bf bd 3b 0e ef bf bd 40 0c ef bf bd e1 ad 91 ef   ..;....@........
0000020:    bf bd 03 07 ef bf bd 18 ef bf bd 65 09 c7 a1 ef   ...........e....
0000030:    bf bd 41 01 ef bf bd 3a 7f 5c 22 ef bf bd 24 5b   ..A....:.\"...$[
0000040:    21 ef bf bd 2a ef bf bd ef bf bd 35 1e ef bf bd   !...*......5....
0000050:    ef bf bd ef bf bd 56 5b 1b 1e 6d ef bf bd ef bf   ......V[..m.....
0000060:    bd ef bf bd ca a5 21 ef bf bd ef bf bd ef bf bd   ......!.........
0000070:    ef bf bd 0e d1 ac ef bf bd 77 47 36 0b ef bf bd   .........wG6....
0000080:    77 ef bf bd 12 ef bf bd ef bf bd ef bf bd 2d 6b   w.............-k
0000090:    37 3b ef bf bd 1f ef bf bd d7 95 3b ef bf bd 73   7;.........;...s
00000a0:    37 ef bf bd 4b 6d 59 ef bf bd 72 ef bf bd 3c ef   7...KmY...r...<.
00000b0:    bf bd 04 ef bf bd ca b3 7f 17 02 00 00 00 00 00   ................
00000c0:    00 00 00 00 00 00 00 00 ef bf bd ef bf bd 06 59   ...............Y
00000d0:    e9 a9 85 00 28 22 0d 0a   ....("..

anyone please help me with this..

thanks


Solution

  • I wrote a TGZ codec and registered it. I use it for reading TGZ files as text/csv files. Maybe that helps:

    final class DecompressTgzCodec extends CompressionCodec {
      override def getDefaultExtension: String = ".tgz"
    
      override def createOutputStream(out: OutputStream): CompressionOutputStream = ???
    
      override def createOutputStream(out: OutputStream, compressor: Compressor): CompressionOutputStream = ???
    
      override def createCompressor(): Compressor = ???
    
      override def getCompressorType: Class[_ <: Compressor] = ???
    
      override def createInputStream(in: InputStream): CompressionInputStream = {
        new TarDecompressorStream(new TarArchiveInputStream(new GzipCompressorInputStream(in)))
      }
    
      override def createInputStream(in: InputStream, decompressor: Decompressor): CompressionInputStream = createInputStream(in)
    
      // we do not create any decompressor
      override def createDecompressor(): Decompressor = null
    
      // we do not create any decompressor
      override def getDecompressorType: Class[_ <: Decompressor] = null
    
    
      // see https://www.conductor.com/nightlight/how-to-build-a-speedy-custom-compression-codec-for-hadoop/
      final class TarDecompressorStream(in: TarArchiveInputStream) extends DecompressorStream(in) {
        def updateStream(): Unit = {
          // still have data in stream -> done
          if (in.available() <= 0) {
            // create stream content from following tar elements one by one
            in.getNextTarEntry
          }
        }
    
        override def read: Int = {
          checkStream()
          updateStream()
          in.read()
        }
    
        override def read(b: Array[Byte], off: Int, len: Int): Int = {
          checkStream()
          updateStream()
          in.read(b, off, len)
        }
    
        override def resetState(): Unit = {
        }
      }
    }
    ...
    // register the new codec:
        val conf = new SparkConf()
        conf.set("spark.hadoop.io.compression.codecs", classOf[DecompressTgzCodec].getName)
    
        val spark = SparkSession
          .builder()
          .master(...)
          .config(conf)
          .appName("Test")
          .getOrCreate()