Search code examples
google-bigquerystorage

Initiating and reading from multiple streams with the BigQuery Storage API (Beta)


The BigQuery Storage API (https://googleapis.github.io/google-cloud-python/latest/bigquery_storage/gapic/v1beta1/api.html) is incredibly useful for reading data from a BigQuery table almost 10x faster than the standard BigQuery API. To make it even faster, it supports multiple read streams, each which reads a dynamically allocated set of rows from the relevant table.

My problem is this: Although you may request a number of streams, the allocated streams after the request is not within your control. As such, I have not been able to initiate more than 1 stream.

The data I'm reading consists of 3 columns and 6 million rows as you can see below. I print the total number of streams created to the console.

from google.cloud import bigquery_storage_v1beta1

project_id = 'myproject'
client = bigquery_storage_v1beta1.BigQueryStorageClient()

table_ref = bigquery_storage_v1beta1.types.TableReference()
table_ref.project_id = "bigquery-public-data"
table_ref.dataset_id = "usa_names"
table_ref.table_id = "usa_1910_current"

read_options = bigquery_storage_v1beta1.types.TableReadOptions()
read_options.selected_fields.append("year")
read_options.selected_fields.append("name")
read_options.selected_fields.append("number")

# I request 3 streams to be created!
requested_streams = 3  

parent = "projects/{}".format(project_id)
session = client.create_read_session(
    table_ref, parent, table_modifiers=modifiers, read_options=read_options, 
    requested_streams=requested_streams
)  

response = client.batch_create_read_session_streams(session, requested_streams)

# I see only 1 stream being created.
print("Streams created: " + str(len(session.streams)))
print("Stream names array: " + str(session.streams))


reader = client.read_rows(
    bigquery_storage_v1beta1.types.StreamPosition(stream=session.streams[0])
)

rows = reader.rows(session)

names = set()

import time
start = time.time()
#---------------------------------------------------
i=0
for row in rows:
    i += 1
    names.add(row["name"])
    if i > 6000000:
        break
#---------------------------------------------------    
end = time.time()
print(end - start)
print("Got {} unique names and {} total rows.".format(len(names), i))

I have a few questions:

1) Am I only seeing 1 stream because the multi-stream implementation isn't complete (The API is in Beta release)?

2) Am I seeing only 1 stream because the data is relatively "small" for the stream allocation algorithm? 6m rows are sizeable already.

3) If I were to start seeing multiple streams created, the API documentation doesn't describe how to read from these in parallel. Any thoughts on how to do this?


Solution

  • The issue is the table you're reading from only having a single input file available. While it has 6 million rows, the data is highly compressible and as such there's only a single backing columnar file for the data. Currently, the storage API will not split data more granularly than this.

    You would see the same thing (there's only a single input) if you examine a query plan that SELECTs from this table.