I am trying to read a section of bigquery table using query in azure databricks spark.
table_id = str(project_id) + "." + str(schema) + "." + str(table_name)
I am able to read complete table data using the following query.
_data = spark.read.format("bigquery").option("credentials", ans). \
option("parentProject", project_id). \
option("project", project_id). \
option("table", table_id). \
option("dataset", schema).load()
But when i try to do the same using sql query in the following way,
_query = """select * from `{}` limit 2""".format(table_id)
_data = spark.read.format("bigquery").option("credentials", ans). \
option("parentProject", project_id). \
option("project", project_id). \
option("dataset", schema). \
load(_query)
total = _data.count()
IllegalArgumentException: Invalid Table ID 'select col1 from `proj-164408.schema.mytable` limit 2'. Must match '^(((\S+)[:.])?(\w+).)?([\S&&[^.:]]+)$$'
I tried with differnt types of table ids like proj-164408:schema.mytable , proj-164408:schema:mytable
Attaching the stacktrace information.
---------------------------------------------------------------------------
IllegalArgumentException Traceback (most recent call last)
<command-755248569207678> in <module>
88 option("parentProject", project_id). \
89 option("project", project_id).option("dataset", schema). \
---> 90 load(_query)
91
92 total = _data.count()
/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
176 self.options(**options)
177 if isinstance(path, basestring):
--> 178 return self._df(self._jreader.load(path))
179 elif path is not None:
180 if type(path) != list:
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
131 # Hide where the exception came from that shows a non-Pythonic
132 # JVM exception message.
--> 133 raise_from(converted)
134 else:
135 raise
/databricks/spark/python/pyspark/sql/utils.py in raise_from(e)
Databricks Run time version that i used: 7.3 LTS (includes Apache Spark 3.0.1, Scala 2.12)
There is already a question which talks about similar issue but could not apply that to my case. Refer BigQuery: Invalid table ID
Update 1:
Found one link for source code which throws the mentioned error. com.google.cloud.bigquery.connector.common.BigQueryUtil Line number: 106
It seems we can only give fully qualified table name,
private static final String PROJECT_PATTERN = "\\S+";
private static final String DATASET_PATTERN = "\\w+";
// Allow all non-whitespace beside ':' and '.'.
// These confuse the qualified table parsing.
private static final String TABLE_PATTERN = "[\\S&&[^.:]]+";
/**
* Regex for an optionally fully qualified table.
*
* <p>Must match 'project.dataset.table' OR the legacy 'project:dataset.table' OR 'dataset.table'
* OR 'table'.
*/
private static final Pattern QUALIFIED_TABLE_REGEX =
Pattern.compile(
format("^(((%s)[:.])?(%s)\\.)?(%s)$$", PROJECT_PATTERN, DATASET_PATTERN, TABLE_PATTERN));
Matcher matcher = QUALIFIED_TABLE_REGEX.matcher(rawTable);
if (!matcher.matches()) {
throw new IllegalArgumentException(
format("Invalid Table ID '%s'. Must match '%s'", rawTable, QUALIFIED_TABLE_REGEX));
Since i gave SQL query instead of fq table name, i am getting mentioned error.
As of now, we are using big query client along with spark to read the section of dataframe.
import json
import base64 as bs
from google.oauth2 import service_account
from google.cloud import bigquery
project_id = ""
schema = ""
table_name = ""
credentials_dict = {}
schema_table_name = str(project_id) + "." + str(schema) + "." + str(table_name)
s = json.dumps(credentials_dict)
res = bs.b64encode(s.encode('utf-8'))
ans = res.decode("utf-8")
credentials = service_account.Credentials.from_service_account_info(credentials_dict)
query = "SELECT * FROM `{}` where col_1 > 50;".format(schema_table_name)
client = bigquery.Client(credentials=credentials, project=project_id)
query_job = client.query(query)
query_job.result()
df = spark.read.format('bigquery') \
.option("credentials", ans) \
.option("parentProject", project_id) \
.option("project", project_id) \
.option('dataset', query_job.destination.dataset_id) \
.load(query_job.destination.table_id)