Search code examples
apache-sparkpysparkgoogle-bigqueryazure-databricks

Bigquery "Getting IllegalArgumentException: Invalid Table ID" error while reading part of bigquery table into pyspark dataframe using query


I am trying to read a section of bigquery table using query in azure databricks spark.

table_id = str(project_id) + "." + str(schema) + "." + str(table_name)

I am able to read complete table data using the following query.

 _data = spark.read.format("bigquery").option("credentials", ans). \
     option("parentProject", project_id). \
     option("project", project_id). \
     option("table", table_id). \
     option("dataset", schema).load()

But when i try to do the same using sql query in the following way,

_query = """select * from `{}` limit 2""".format(table_id)
_data = spark.read.format("bigquery").option("credentials", ans). \
    option("parentProject", project_id). \
    option("project", project_id). \
    option("dataset", schema). \
    load(_query)

total = _data.count()

IllegalArgumentException: Invalid Table ID 'select col1 from `proj-164408.schema.mytable` limit 2'. Must match '^(((\S+)[:.])?(\w+).)?([\S&&[^.:]]+)$$'

I tried with differnt types of table ids like proj-164408:schema.mytable , proj-164408:schema:mytable

Attaching the stacktrace information.

---------------------------------------------------------------------------
IllegalArgumentException                  Traceback (most recent call last)
<command-755248569207678> in <module>
     88     option("parentProject", project_id). \
     89     option("project", project_id).option("dataset", schema). \
---> 90     load(_query)
     91 
     92 total = _data.count()

/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
    176         self.options(**options)
    177         if isinstance(path, basestring):
--> 178             return self._df(self._jreader.load(path))
    179         elif path is not None:
    180             if type(path) != list:

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    131                 # Hide where the exception came from that shows a non-Pythonic
    132                 # JVM exception message.
--> 133                 raise_from(converted)
    134             else:
    135                 raise

/databricks/spark/python/pyspark/sql/utils.py in raise_from(e)

Databricks Run time version that i used: 7.3 LTS (includes Apache Spark 3.0.1, Scala 2.12)

There is already a question which talks about similar issue but could not apply that to my case. Refer BigQuery: Invalid table ID

Update 1:

Found one link for source code which throws the mentioned error. com.google.cloud.bigquery.connector.common.BigQueryUtil Line number: 106

It seems we can only give fully qualified table name,

      private static final String PROJECT_PATTERN = "\\S+";
      private static final String DATASET_PATTERN = "\\w+";
      // Allow all non-whitespace beside ':' and '.'.
      // These confuse the qualified table parsing.
      private static final String TABLE_PATTERN = "[\\S&&[^.:]]+";
      /**
       * Regex for an optionally fully qualified table.
       *
       * <p>Must match 'project.dataset.table' OR the legacy 'project:dataset.table' OR 'dataset.table'
       * OR 'table'.
       */
      private static final Pattern QUALIFIED_TABLE_REGEX =
          Pattern.compile(
              format("^(((%s)[:.])?(%s)\\.)?(%s)$$", PROJECT_PATTERN, DATASET_PATTERN, TABLE_PATTERN));

    Matcher matcher = QUALIFIED_TABLE_REGEX.matcher(rawTable);
    if (!matcher.matches()) {
      throw new IllegalArgumentException(
          format("Invalid Table ID '%s'. Must match '%s'", rawTable, QUALIFIED_TABLE_REGEX));

Since i gave SQL query instead of fq table name, i am getting mentioned error.


Solution

  • As of now, we are using big query client along with spark to read the section of dataframe.

    import json
    import base64 as bs
    from google.oauth2 import service_account
    from google.cloud import bigquery
    
    project_id = ""
    schema = ""
    table_name = ""
    credentials_dict = {}
    
    schema_table_name = str(project_id) + "." + str(schema) + "." + str(table_name)
    s = json.dumps(credentials_dict)
    res = bs.b64encode(s.encode('utf-8'))
    ans = res.decode("utf-8")
    credentials = service_account.Credentials.from_service_account_info(credentials_dict)
    
    query = "SELECT * FROM `{}` where col_1 > 50;".format(schema_table_name)
    
    client = bigquery.Client(credentials=credentials, project=project_id)
    query_job = client.query(query)
    query_job.result()
    
    df = spark.read.format('bigquery') \
        .option("credentials", ans) \
        .option("parentProject", project_id) \
        .option("project", project_id) \
        .option('dataset', query_job.destination.dataset_id) \
        .load(query_job.destination.table_id)