Search code examples
hadoophdfshadoop-streamingsnappy

How to make hadoop snappy output file the same format as those generated by Spark


we are using Spark and up until now the output are PSV files. Now in order to save space, we'd like to compress the output. To do so, we will change to save JavaRDD using the SnappyCodec, like this:

objectRDD.saveAsTextFile(rddOutputFolder, org.apache.hadoop.io.compress.SnappyCodec.class);

We will then use Sqoop to import the output into a database. The whole process works fine.

For previously generated PSV files in HDFS, we'd like to compress them in Snappy format as well. This is the command we tried:

hadoop jar /usr/hdp/2.6.5.106-2/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.5.106-2.jar \
-Dmapred.output.compress=true -Dmapred.compress.map.output=true \
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec \
-Dmapred.reduce.tasks=0 \
-input input-path \
-output output-path

The command works fine. But the issue is, sqoop can't parse the snappy output files.

When we use a command like "hdfs dfs -text hdfs-file-name" to view the generated files, the output looks like below, with a "index" like field added into each line:

0       2019-05-02|AMRS||5072||||3540||MMPT|0|
41      2019-05-02|AMRS||5538|HK|51218||1000||Dummy|45276|
118     2019-05-02|AMRS||5448|US|51218|TRADING|2282|HFT|NCR|45119|

I.e., an extra value like "0 ", "41 ", "118 " are added into the beginning of each line. Note that the .snappy files generated by Spark doesn't has this "extra-field".

Any idea how to prevent this extra field being inserted?

Thanks a lot!


Solution

  • These are not indexes but rather keys generated by TextInputFormat, as explained here.

    The class you supply for the input format should return key/value pairs of Text class. If you do not specify an input format class, the TextInputFormat is used as the default. Since the TextInputFormat returns keys of LongWritable class, which are actually not part of the input data, the keys will be discarded; only the values will be piped to the streaming mapper.

    And since you do not have any mapper defined in your job, those key/value pairs are written straight out to the file system. So as the above excerpt hints, you need some sort of a mapper that would discard the keys. A quick-and-dirty is to use something already available to serve as a pass-through, like a shell cat command:

    hadoop jar /usr/hdp/2.6.5.106-2/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.5.106-2.jar \
    -Dmapred.output.compress=true -Dmapred.compress.map.output=true \
    -Dmapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec \
        -mapper /bin/cat \
    -Dmapred.reduce.tasks=0 \
    -input input-path \
    -output output-path