Am able to get Google Datalab (Notebooks) running in Google Chrome with the correct TCP firewall permissions. Using the simple script, this launches the most current spark cluster (1 master with 3 workers using Dataproc). First we test the below code in spark-submit
, then after launching DataLab I'm not sure how to fix the below error.
First step: Launch Dataproc Cluster from Cloud Shell
gcloud dataproc clusters create cluster1021 \
--subnet default --zone us-west1-a \
--master-machine-type n1-standard-2 \
--master-boot-disk-size 30 --num-workers 2 \
--worker-machine-type n1-standard-2 \
--worker-boot-disk-size 30 --image-version 1.3-deb9 \
--project bigdata-228217 \
--initialization-actions 'gs://dataproc-initialization-actions/datalab/datalab.sh','gs://dataproc-initialization-actions/connectors/connectors.sh' \
--metadata 'gcs-connector-version=1.9.11' \
--metadata 'bigquery-connector-version=0.13.11'
After successfully launching I tested to see that the Bigquery connector is working with spark-submit wordcount.py
example from Google here.
Second step: Include this code in the master home directory as wordcount.py
with touch wordcount.py
, then paste in below code from nano wordcount.py
and save.
#!/usr/bin/python
"""BigQuery I/O PySpark example."""
from __future__ import absolute_import
import json
import pprint
import subprocess
import pyspark
from pyspark.sql import SQLContext
sc = pyspark.SparkContext()
# Use the Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf = {
# Input Parameters.
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'publicdata',
'mapred.bq.input.dataset.id': 'samples',
'mapred.bq.input.table.id': 'shakespeare',
}
# Output Parameters.
output_dataset = 'wordcount_dataset'
output_table = 'wordcount_output'
# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',
conf=conf)
# Perform word count.
word_counts = (
table_data
.map(lambda record: json.loads(record[1]))
.map(lambda x: (x['word'].lower(), int(x['word_count'])))
.reduceByKey(lambda x, y: x + y))
# Display 10 results.
pprint.pprint(word_counts.take(10))
# Stage data formatted as newline-delimited JSON in Cloud Storage.
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
output_files = output_directory + '/part-*'
sql_context = SQLContext(sc)
(word_counts
.toDF(['word', 'word_count'])
.write.format('json').save(output_directory))
# Shell out to bq CLI to perform BigQuery import.
subprocess.check_call(
'bq load --source_format NEWLINE_DELIMITED_JSON '
'--replace '
'--autodetect '
'{dataset}.{table} {files}'.format(
dataset=output_dataset, table=output_table, files=output_files
).split())
# Manually clean up the staging_directories, otherwise BigQuery
# files will remain indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)
output_path = sc._jvm.org.apache.hadoop.fs.Path(output_directory)
output_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(
output_path, True)
Now, from the shell, The output from spark-submit
here are the results -- showing that the BigQuery connector works.
spark-submit wordcount.py
...
(pinnace,3)
(bone,21)
(lug,2)
(vailing,2)
(bombast,3)
(gaping,11)
(hem,5)
('non,1)
(stinks,1)
(forsooth,48)
Step 3 Set up Firewall to allow for TCP DataLab view in Browser
Create firewall rule for DataLab
On the setup page, you'll create a name for the DataLab firewall rule, and allow for the below TCP ports, along with "/32" immediately after your network's IP address -- which you can find here.
Step 4: Launch DataLab in Google Chrome with <YOUR IP>:8080
and you should see the DataLab Notebook
You'll see this. Now open a new notebook, and in the first cell I stopped the spark context and pasted the above Shakespeare code into the second cell.
Here is the output. Question, what do I need to do to get the Bigquery Connector to work with Pyspark in Datalab??
Py4JJavaErrorTraceback (most recent call last)
<ipython-input-4-62761a09a7c5> in <module>()
36 'org.apache.hadoop.io.LongWritable',
37 'com.google.gson.JsonObject',
---> 38 conf=conf)
39
40 # Perform word count.
/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py in newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, conf, batchSize)
735 jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
736 valueClass, keyConverter, valueConverter,
--> 737 jconf, batchSize)
738 return RDD(jrdd, self)
739
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD.
: java.lang.ClassNotFoundException: com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:239)
at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDDFromClassNames(PythonRDD.scala:313)
at org.apache.spark.api.python.PythonRDD$.newAPIHadoopRDD(PythonRDD.scala:296)
at org.apache.spark.api.python.PythonRDD.newAPIHadoopRDD(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Judging from this line, Datalab init action mounts BQ and GCS connectors into Docker container.
Because Dataproc 1.3 does not come with BQ connector by default and because you specified Connectors init action, that installs BQ connector on the cluster, after DataLab init action, Docker can not mount BQ connector into Datalab container during Datalab init action execution.
To fix this issue you need to change order of init actions:
gcloud dataproc clusters create \
. . .
--initialization-actions=gs://dataproc-initialization-actions/datalab/connectors.sh,gs://dataproc-initialization-actions/connectors/datalab.sh
As a minor improvement, you don't need to specify GCS connector version (--metadata 'gcs-connector-version=1.9.11'
) at the moment, because latest Dataproc 1.3 image already has GCS connector 1.9.11 pre-installed.