Search code examples
javaperformancehadoopsqoop

sqoop import-all-tables slow and sequence files are custom java objects


I am working to sync a very large database to hive.

There are 2 issues: (1) Text imports are slower and there is a slow large mapreduce step. (2) Sequence files are much faster but are not readable by normal means.

Details follow:

(1) If we import the data as text, it is slower. The files accumulate in the home directory in a temp folder but eventually a mapreduce job is created which is rather slow.

17/04/25 04:18:34 INFO mapreduce.Job: Job job_1490822567992_0996 running in uber mode : false
17/04/25 04:18:34 INFO mapreduce.Job:  map 0% reduce 0%
17/04/25 11:05:59 INFO mapreduce.Job:  map 29% reduce 0%
17/04/25 11:20:18 INFO mapreduce.Job:  map 86% reduce 0% <-- tends to hang a very long time here

(A lot of lines deleted for brevity.)

(2) If we import the files as sequencefiles, it is much faster but the retrieved data is unreadable by Hive since it needs to know about the autogenerated Java files that get created. This also has a mapreduce step, but it seemed to go faster (or perhaps that was a time of day thing...).

We have a series of these classes for each table that are produced by sqoop: public class MyTableName extends SqoopRecord implements DBWritable, Writable

What are the steps for using those classes? How do we install them in hive? Surprisingly, the Cloudera support engineer doesn't know, as this must be infrequently charted territory??

sqoop import-all-tables --connect '...' --relaxed-isolation --num-mappers 7 --compress --autoreset-to-one-mapper --compression-codec=snappy --outdir javadir --as-sequencefile --hive-delims-replacement ' '

Any advice?


Solution

  • I am open to Spark. Do you have some sample code?

    Disclaimer: I just assembled some snippets from multiple notebooks, and was too lazy (and hungry) to launch a test-run before leaving the office. Any bugs and typos are yours to find.


    Using Spark 2.0 available from Cloudera parcel (with Hive support), an interactive-style Scala script, in local mode, without any data partitioning, a Microsoft SQL Server connection, and inserting directly into an existing Hive managed table (with some extra business logic)...

    spark2-shell --master local --driver-class-path /some/path/to/sqljdbc42.jar
    

    // Side note: automatic registration of type 4 JDBC drivers is broken in multiple Spark builds, and the bug keeps on reappearing, so it's safer to specify the driver class just in case...

    val weather = spark.read.format("jdbc").option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").option("url", "jdbc:sqlserver://myhost\\SQLExpress:9433;database=mydb").option("user", "mylogin").option("password", "*****").option("dbtable", "weather_obs").load()
    { printf( "%%% Partitions: %d / Records: %d\n", weather.rdd.getNumPartitions, weather.count)
      println("%%% Detailed DF schema:")
      weather.printSchema
    }
    

    // Alternative for "dbtable" using a sub-query :
    // "(SELECT station, dt_obs_utc, temp_k FROM observation_meteo WHERE station LIKE '78%') x")

    weather.registerTempTable("wth")
    spark.sql(
        """
        INSERT INTO TABLE somedb.sometable
        SELECT station, dt_obs_utc, CAST(temp_k -273.15 AS DECIMAL(3,1)) as temp_c
        FROM wth
        WHERE temp_k IS NOT NULL
        """)
    dropTempTable("wth")
    
    weather.unpersist()
    


    Now, if you want to dynamically create a Hive external table on a Parquet file with GZip compression, remplace the "temp table" trick with...

    weather.write.option("compression","gzip").mode("overwrite").parquet("hdfs:///some/directory/")
    

    // supported compression codecs for Parquet: none, snappy (default), gzip
    // supported compression codecs for CSV: none (default), snappy, lz4, gzip, bzip2

    def toImpalaType(sparkType : String ) : String = {
      if (sparkType == "StringType" || sparkType == "BinaryType")  { return "string" }
      if (sparkType == "BooleanType")                              { return "boolean" }
      if (sparkType == "ByteType")                                 { return "tinyint" }
      if (sparkType == "ShortType")                                { return "smallint" }
      if (sparkType == "IntegerType")                              { return "int" }
      if (sparkType == "LongType")                                 { return "bigint" }
      if (sparkType == "FloatType")                                { return "float" }
      if (sparkType == "DoubleType")                               { return "double" }
      if (sparkType.startsWith("DecimalType"))                     { return sparkType.replace("DecimalType","decimal") }
      if (sparkType == "TimestampType" || sparkType == "DateType") { return "timestamp" }
      println("########## ERROR - \"" +sparkType +"\" not supported (bug)")
      return "string"
    }
    
    spark.sql("DROP TABLE IF EXISTS somedb.sometable")
    { val query = new StringBuilder
      query.append("CREATE EXTERNAL TABLE somedb.sometable")
      val weatherSchema =weather.dtypes
      val (colName0,colType0) = weatherSchema(0)
      query.append("\n ( " +colName0 + " " +toImpalaType(colType0))
      for ( i <- 2 to tempSchema.length) { val (colName_,colType_) = tempSchema(i-1) ; query.append("\n , " +colName_ + " " +toImpalaType(colType_)) }
      query.append("\n )\nCOMMENT 'Imported from SQL Server by Spark'")
      query.append("\nSTORED AS Parquet")
      query.append("\nLOCATION 'hdfs:///some/directory'")
      sqlContext.sql(query.toString())
      query.clear()
    }
    


    If you want to partition your input table (based on a numeric column - date/time not supported AFAIK) then look into the JDBC import options partitionColumn, lowerBoundand upperBound.

    If you want to load these partitions in parallel in YARN-client mode, then add a --jars argument to upload the JDBC driver to the executors.