Search code examples
google-bigquerygoogle-cloud-dataproc

Can I filter data returned by the BigQuery connector for Spark?


I have adapted the instructions at Use the BigQuery connector with Spark to extract data from a private BigQuery object using PySpark. I am running the code on Dataproc. The object in question is a view that has a cardinality >500million rows. When I issue this statement:

table_data = spark.sparkContext.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

in the job output I see:

Bigquery connector version 0.10.7-hadoop2
Creating BigQuery from default credential.
Creating BigQuery from given credential.
Using working path: 'gs://dataproc-9e5dc592-1a35-42e6-9dd6-5f9dd9c8df87-europe-west1/hadoop/tmp/bigquery/pyspark_input20181108091647'
Estimated number of shards < desired num maps (0 < 93); clipping to 0.
Computed '2' shards for sharded BigQuery export.
Table 'projectname:datasetname.viewname' to be exported has 0 rows and 0 bytes
Estimated number of shards < desired num maps (0 < 93); clipping to 0.
Computed '2' shards for sharded BigQuery export.
Table 'projectname:datasetname.viewname' to be exported has 0 rows and 0 bytes

(timestamp/message-level/namespace removed for readability)

That was over 2 hours ago and the job is still running, there has been no more output in that time. I have looked in the mentioned gs location and can see that a directory called shard-0 has been located, but it is empty. Essentially there has been no visible activity for the past 2 hours.

I'm worried that the bq connector is trying to extract the entirety of that view. Is there a way that I can issue a query to define what data to extract as opposed to extracting the entire view?


UPDATE
I was intrigued by this message in the output:

Estimated number of shards < desired num maps (0 < 93); clipping to 0

It seems strange to me that estimated number of shards would be 0. I've taken a look at some of the code (ShardedExportToCloudStorage.java) that is getting executed here and the above message is logged from computeNumShards(). Given numShards=0 I'm assuming that numTableBytes=0 which means function call:

tableToExport.getNumBytes();

(ShardedExportToCloudStorage.java#L97)

is returning 0 and I assume that the reason for that is that the object I am accessing is a view, not a table. Am I onto something here or am I on a wild goose chase?


UPDATE 2... To test out my theory (above) that the source object being a view is causing a problem I have done the following:

Created a table in the same project as my dataproc cluster

create table jt_test.jttable1 (col1 string)

Inserted data into it

insert into jt_test.jttable1 (col1) values ('foo')

Submitted a dataproc job to read the table and output the number of rows

Here's the code:

conf = {
    # Input Parameters.
     'mapred.bq.project.id': project
    ,'mapred.bq.gcs.bucket': bucket
    ,'mapred.bq.temp.gcs.path': input_directory
    ,'mapred.bq.input.project.id': 'myproject'
    ,'mapred.bq.input.dataset.id': 'jt_test'
    ,'mapred.bq.input.table.id': jttable1
}
table_data = spark.sparkContext.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)
print ('got table_data')
print (table_data.toDF().head(10))
print ('row tally={}'.format(table_data.toDF().count()))

When I run the dataproc pyspark job, here's the output:

8/11/08 14:59:26 INFO <cut> Table 'myproject:jt_test.jttable1' to be exported has 1 rows and5 bytes
got table_data
[Row(_1=0, _2=u'{"col1":"foo"}')]
row tally=1

Create a view over the table

create view jt_test.v_jtview1 as select col1 from `myproject.jt_test.jttable1`

Run the same job but this time consume the view instead of the table

conf = {
    # Input Parameters.
     'mapred.bq.project.id': project
    ,'mapred.bq.gcs.bucket': bucket
    ,'mapred.bq.temp.gcs.path': input_directory
    ,'mapred.bq.input.project.id': 'myproject'
    ,'mapred.bq.input.dataset.id': 'jt_test'
    ,'mapred.bq.input.table.id': v_jtview1
}

When I run the dataproc pyspark job, here's the output:

Table 'dh-data-dev-53702:jt_test.v_jtview1' to be exported has 0 rows and 0 bytes

and that's it! There's no more output and the job is still running, exactly the same as I explained above. Its effectively hung.

Seems to be a limitation of the BigQuery connector - I can't use it to consume from views.


Solution

  • To close the loop here, jamiet@ confirmed in the comment that root cause is that BigQuery does not support export from Views, it supports export only from Tables.