Search code examples
pysparkgoogle-bigquerygoogle-cloud-dataproc

BigQuery and Pyspark in Dataproc


I have a table in BigQuery that I want to query and implement FPgrowth algorithm. I want to try it first on the pyspark shell using a VM instance of the dataproc cluster.

I am looking for a way to directly query the table in BQ using pyspark. I want to use the resulting queried data to implement FPGrowth (which I am already familiar with).


Solution

  • Dataproc already has the necessary connectors available to query over BigQuery as you can see in the docs.

    Code sample from docs:

    import pyspark
    from pyspark.sql import SQLContext
    
    sc = pyspark.SparkContext()
    
    # Use the Google Cloud Storage bucket for temporary BigQuery export data used
    # by the InputFormat. This assumes the Google Cloud Storage connector for
    # Hadoop is configured.
    bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
    project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
    input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
    
    conf = {
        # Input Parameters.
        'mapred.bq.project.id': project,
        'mapred.bq.gcs.bucket': bucket,
        'mapred.bq.temp.gcs.path': input_directory,
        'mapred.bq.input.project.id': 'publicdata',
        'mapred.bq.input.dataset.id': 'samples',
        'mapred.bq.input.table.id': 'shakespeare',
    }
    
    # Output Parameters.
    output_dataset = 'wordcount_dataset'
    output_table = 'wordcount_output'
    
    # Load data in from BigQuery.
    table_data = sc.newAPIHadoopRDD(
        'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
        'org.apache.hadoop.io.LongWritable',
        'com.google.gson.JsonObject',
        conf=conf)
    

    What I also recommend is to create a Dataproc cluster with a Jupyter service installed. This will give you the possibility of testing on the fly on how to implement FPgrowth or any other idea you want to try eventually.

    In fact, before writing this answer, I just used my current jupyter notebook to query BQ to see how it would work:

    enter image description here