Search code examples
apache-sparkgoogle-apigoogle-bigquerygcloudgoogle-app-invites

How to connect spark to BigQuery using BigQuery api


I'm newbie in gcloud and BigQuery and want to read data from BigQuery using spark. I used Google APIs Client Library for Java. and able to connect with BigQuery. I get the com.google.api.services.bigquery.Bigquery object and able to print read datasets,tableId,and tableData

My question is

How can I connect this BigQuery authenticate object(credential object) to spark or is there anyway to use this object with hadoopApi

if there is no possibility than how can pass credential object to newHadoopAPi

GoogleAuthorizationCodeFlow flow = getFlow();
    GoogleTokenResponse response = flow.newTokenRequest(authorizationCode)
            .setRedirectUri(REDIRECT_URI).execute();
    Credential credential=flow.createAndStoreCredential(response, null);
    return credential; 

My Hadoop api code is where I want to use my credential object

val tableData = sc.newAPIHadoopRDD(
  conf,
  classOf[GsonBigQueryInputFormat],
  classOf[LongWritable],
  classOf[JsonObject]).

Solution

  • Thanx @michael with the help of your link I found the solution

    Just disable service account on hadoop configuration

    hadoopConfiguration.set("fs.gs.auth.service.account.enable", "false")
    

    and following code will be work

    val hadoopConfiguration = sc.hadoopConfiguration
    //BigQueryConfiguration.
    hadoopConfiguration.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    hadoopConfiguration.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
    hadoopConfiguration.set("fs.gs.project.id", projectId);
    hadoopConfiguration.set("fs.gs.auth.service.account.enable", "false")
    hadoopConfiguration.set("fs.gs.auth.client.id",
      clientId)
    hadoopConfiguration.set("fs.gs.auth.client.secret",
      clientSecret)
    hadoopConfiguration.set("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem");
    hadoopConfiguration.set("fs.gs.auth.client.file", tokenPath);
    hadoopConfiguration.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket)
    
    // Configure input and output for BigQuery access.
    com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration.configureBigQueryInput(hadoopConfiguration, dataSetId + "." + tableId)
    val tableData = sc.newAPIHadoopRDD(
      hadoopConfiguration,
      classOf[GsonBigQueryInputFormat],
      classOf[LongWritable],
      classOf[JsonObject])
    

    Where token path contain refresh token

    {
        "credentials": {
            "user": {
                "access_token":     "ya29..wgL6fH2Gx5asdaadsBl2Trasd0sBqV_ZAS7xKDtNS0z4Qyv5ypassdh0soplQ",
                "expiration_time_millis": 1460473581255,
                "refresh_token": "XXXXXXXXXxxxxxxxxx"
                }
           }
    }