Search code examples
javahadoopapache-sparkhbase

Get row on Spark in map Call


Itry to aggregate data from a file in HDFS. I need to add some details from those datas with value on a specific Table in hbase.

but I have the exception :

org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
    at org.apache.spark.rdd.RDD.map(RDD.scala:286)
    at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:113)
    at org.apache.spark.api.java.AbstractJavaRDDLike.mapToPair(JavaRDDLike.scala:46)
    at ......
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:577)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:174)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:197)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation
Serialization stack:

    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)

I know that the problem occured when we try to access to the hbase during the map function.

My question is: how to complete my RDDs with the value contains on the hbase Table.

for example: file in hdfs are csv:

Name;Number1;Number2
toto;1;2

in hbase we have data associate to the name toto.

i need to retrieve the sum of Number1 and Number 2 (that the easiest part) and aggregate with the data in the table. for example:

the key for the reducer will be tata and be retrieve by get the rowkey toto in the hbase table.

Any suggestions?


Solution

  • Finally a colleague did it, thanks to yours advice:

    so this is the code of the map that permits to aggregate a file with datas from the hbase table.

    private final Logger LOGGER = LoggerFactory.getLogger(AbtractGetSDMapFunction.class);
    
    
    
    
    /**
     * Namespace name
     */
    public static final String NAMESPACE = "NameSpace";
    private static final String ID = "id";
    private Connection connection = null;
    private static final String LINEID = "l";
    private static final String CHANGE_LINE_ID = "clid";
    private static final String CHANGE_LINE_DATE = "cld";
    private String constClientPortHBase;
    private String constQuorumHBase;
    private int constTimeOutHBase;
    private String constZnodeHBase;
    public void initConnection() {
        Configuration conf = HBaseConfiguration.create();
        conf.setInt("timeout", constTimeOutHBase);
        conf.set("hbase.zookeeper.quorum", constQuorumHBase);
        conf.set("hbase.zookeeper.property.clientPort", constClientPortHBase);
        conf.set("zookeeper.znode.parent", constZnodeHBase);
        try {
            connection = HConnectionManager.createConnection(conf);
        } catch (Exception e) {
            LOGGER.error("Error in the configuration of the connection with HBase.", e);
        }
    }
    
     public Tuple2<String, myInput> call(String row) throws Exception {
    //this is where you need to init the connection for hbase to avoid serialization problem
        initConnection();
    
    ....do your work 
    State state = getCurrentState(myInput.getKey());
    ....do your work 
    }
    
    public AbtractGetSDMapFunction( String constClientPortHBase, String constQuorumHBase, String constZnodeHBase, int constTimeOutHBase) {
        this.constClientPortHBase = constClientPortHBase;
        this.constQuorumHBase = constQuorumHBase;
        this.constZnodeHBase = constZnodeHBase;
        this.constTimeOutHBase = constTimeOutHBase;
    }
    
    /***************************************************************************/
    /**
     * Table Name
     */
    public static final String TABLE_NAME = "Table";
    
    public state getCurrentState(String key) throws TechnicalException {
        LOGGER.debug("start key {}", key);
        String buildRowKey = buildRowKey(key);
        State currentState = new State();
        String columnFamily = State.getColumnFamily();
        if (!StringUtils.isEmpty(buildRowKey) && null != columnFamily) {
            try {
                Get scan = new Get(Bytes.toBytes(buildRowKey));
                scan.addFamily(Bytes.toBytes(columnFamily));
                addColumnsToScan(scan, columnFamily, ID);                
                Result result = getTable().get(scan);
                currentState.setCurrentId(getLong(result, columnFamily, ID));              
            } catch (IOException ex) {
                throw new TechnicalException(ex);
            }
            LOGGER.debug("end ");
        }
        return currentState;
    }
    
    /***********************************************************/
    
    private Table getTable() throws IOException, TechnicalException {
        Connection connection = getConnection();
        // Table retrieve
        if (connection != null) {
            Table table = connection.getTable(TableName.valueOf(NAMESPACE, TABLE_NAME));
    
    
            return table;
        } else {
            throw new TechnicalException("Connection to Hbase not available");
        }
    }
    
    /****************************************************************/
    
    
    
    
    private Long getLong(Result result, String columnFamily, String qualifier) {
        Long toLong = null;
        if (null != columnFamily && null != qualifier) {
            byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
            toLong = (value != null ? Bytes.toLong(value) : null);
        }
        return toLong;
    }
    
    private String getString(Result result, String columnFamily, String qualifier) {
        String toString = null;
        if (null != columnFamily && null != qualifier) {
            byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
            toString = (value != null ? Bytes.toString(value) : null);
        }
        return toString;
    }
    
    
    public Connection getConnection() {
        return connection;
    }
    
    public void setConnection(Connection connection) {
        this.connection = connection;
    }
    
    
    
    private void addColumnsToScan(Get scan, String family, String qualifier) {
        if (org.apache.commons.lang.StringUtils.isNotEmpty(family) && org.apache.commons.lang.StringUtils.isNotEmpty(qualifier)) {
            scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        }
    }
    
    private String buildRowKey(String key) throws TechnicalException {
        StringBuilder rowKeyBuilder = new StringBuilder();
        rowKeyBuilder.append(HashFunction.makeSHA1Hash(key));
        return rowKeyBuilder.toString();
    }