I am working to sync a very large database to hive.
There are 2 issues: (1) Text imports are slower and there is a slow large mapreduce step. (2) Sequence files are much faster but are not readable by normal means.
Details follow:
(1) If we import the data as text, it is slower. The files accumulate in the home directory in a temp folder but eventually a mapreduce job is created which is rather slow.
17/04/25 04:18:34 INFO mapreduce.Job: Job job_1490822567992_0996 running in uber mode : false
17/04/25 04:18:34 INFO mapreduce.Job: map 0% reduce 0%
17/04/25 11:05:59 INFO mapreduce.Job: map 29% reduce 0%
17/04/25 11:20:18 INFO mapreduce.Job: map 86% reduce 0% <-- tends to hang a very long time here
(A lot of lines deleted for brevity.)
(2) If we import the files as sequencefiles, it is much faster but the retrieved data is unreadable by Hive since it needs to know about the autogenerated Java files that get created. This also has a mapreduce step, but it seemed to go faster (or perhaps that was a time of day thing...).
We have a series of these classes for each table that are produced by sqoop: public class MyTableName extends SqoopRecord implements DBWritable, Writable
What are the steps for using those classes? How do we install them in hive? Surprisingly, the Cloudera support engineer doesn't know, as this must be infrequently charted territory??
sqoop import-all-tables --connect '...' --relaxed-isolation --num-mappers 7 --compress --autoreset-to-one-mapper --compression-codec=snappy --outdir javadir --as-sequencefile --hive-delims-replacement ' '
Any advice?
I am open to Spark. Do you have some sample code?
Disclaimer: I just assembled some snippets from multiple notebooks, and was too lazy (and hungry) to launch a test-run before leaving the office. Any bugs and typos are yours to find.
spark2-shell --master local --driver-class-path /some/path/to/sqljdbc42.jar
// Side note: automatic registration of type 4 JDBC drivers is broken in multiple Spark builds, and the bug keeps on reappearing, so it's safer to specify the driver class just in case...
val weather = spark.read.format("jdbc").option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").option("url", "jdbc:sqlserver://myhost\\SQLExpress:9433;database=mydb").option("user", "mylogin").option("password", "*****").option("dbtable", "weather_obs").load()
{ printf( "%%% Partitions: %d / Records: %d\n", weather.rdd.getNumPartitions, weather.count)
println("%%% Detailed DF schema:")
weather.printSchema
}
// Alternative for "dbtable"
using a sub-query :
// "(SELECT station, dt_obs_utc, temp_k FROM observation_meteo WHERE station LIKE '78%') x")
weather.registerTempTable("wth")
spark.sql(
"""
INSERT INTO TABLE somedb.sometable
SELECT station, dt_obs_utc, CAST(temp_k -273.15 AS DECIMAL(3,1)) as temp_c
FROM wth
WHERE temp_k IS NOT NULL
""")
dropTempTable("wth")
weather.unpersist()
weather.write.option("compression","gzip").mode("overwrite").parquet("hdfs:///some/directory/")
// supported compression codecs for Parquet: none, snappy (default), gzip
// supported compression codecs for CSV: none (default), snappy, lz4, gzip, bzip2
def toImpalaType(sparkType : String ) : String = {
if (sparkType == "StringType" || sparkType == "BinaryType") { return "string" }
if (sparkType == "BooleanType") { return "boolean" }
if (sparkType == "ByteType") { return "tinyint" }
if (sparkType == "ShortType") { return "smallint" }
if (sparkType == "IntegerType") { return "int" }
if (sparkType == "LongType") { return "bigint" }
if (sparkType == "FloatType") { return "float" }
if (sparkType == "DoubleType") { return "double" }
if (sparkType.startsWith("DecimalType")) { return sparkType.replace("DecimalType","decimal") }
if (sparkType == "TimestampType" || sparkType == "DateType") { return "timestamp" }
println("########## ERROR - \"" +sparkType +"\" not supported (bug)")
return "string"
}
spark.sql("DROP TABLE IF EXISTS somedb.sometable")
{ val query = new StringBuilder
query.append("CREATE EXTERNAL TABLE somedb.sometable")
val weatherSchema =weather.dtypes
val (colName0,colType0) = weatherSchema(0)
query.append("\n ( " +colName0 + " " +toImpalaType(colType0))
for ( i <- 2 to tempSchema.length) { val (colName_,colType_) = tempSchema(i-1) ; query.append("\n , " +colName_ + " " +toImpalaType(colType_)) }
query.append("\n )\nCOMMENT 'Imported from SQL Server by Spark'")
query.append("\nSTORED AS Parquet")
query.append("\nLOCATION 'hdfs:///some/directory'")
sqlContext.sql(query.toString())
query.clear()
}
partitionColumn
, lowerBound
and upperBound
.
If you want to load these partitions in parallel in YARN-client mode, then add a --jars
argument to upload the JDBC driver to the executors.