Search code examples
google-cloud-platformhbasegoogle-cloud-dataprocgoogle-cloud-bigtable

Spark-BigTable - HBase client not closed in Pyspark?


I'm trying to execute a Pyspark statement that writes to BigTable within a Python for loop, which leads to the following error (job submitted using Dataproc). Any client not properly closed (as suggested here) and if yes, any way to do so in Pyspark ?

Note that manually re-executing the script each time with a new Dataproc job works fine, so the job itself is correct.

Thanks for your support !

Pyspark script


from pyspark import SparkContext 
from pyspark.sql import SQLContext 
import json

sc = SparkContext()
sqlc = SQLContext(sc) 

def create_df(n_start,n_stop):

    # Data
        
    row_1 = ['a']+['{}'.format(i) for i in range(n_start,n_stop)]
    row_2 = ['b']+['{}'.format(i) for i in range(n_start,n_stop)]
      
    # Spark schema
    
    ls = [row_1,row_2]
    schema = ['col0'] + ['col{}'.format(i) for i in range(n_start,n_stop)]
    
    # Catalog

    first_col = {"col0":{"cf":"rowkey", "col":"key", "type":"string"}}
    other_cols =  {"col{}".format(i):{"cf":"cf", "col":"col{}".format(i), "type":"string"} for i in range(n_start,n_stop)}
    
    first_col.update(other_cols)
    columns = first_col
        
    d_catalogue = {}
    d_catalogue["table"] = {"namespace":"default", "name":"testtable"}
    d_catalogue["rowkey"] = "key"
    d_catalogue["columns"] = columns
        
    catalog = json.dumps(d_catalogue)
    
    # Dataframe

    df = sc.parallelize(ls, numSlices=1000).toDF(schema=schema) 
    
    return df,catalog

for i in range(0,2):

   N_step = 100
   N_start = 1
   N_stop = N_start+N_step

   data_source_format = "org.apache.spark.sql.execution.datasources.hbase"

   df,catalog = create_df(N_start,N_stop)
   
   df.write\
        .options(catalog=catalog,newTable= "5")\
            .format(data_source_format)\
                .save()

   N_start += N_step
   N_stop += N_step

Dataproc job

gcloud dataproc jobs submit pyspark <my_script>.py \
    --cluster $SPARK_CLUSTER \
        --jars <path_to_jar>/bigtable-dataproc-spark-shc-assembly-0.1.jar \
            --region=us-east1 

Error

...
ERROR com.google.bigtable.repackaged.io.grpc.internal.ManagedChannelOrphanWrapper: *~*~*~ Channel ManagedChannelImpl{logId=41, target=bigtable.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
...

Solution

  • If you are not using the latest version, try updating to it. It looks similar to this issue that was fixed recently. I would imagine the error message still showing up, but the job now finishing means that the support team is still working on it and hopefully they will fix it in the next release.