I have created a Cloudera 5.x cluster with the Spark option set:
I would like to run a simple test using PySpark to read data from one Datatap and write it to another Datatap.
What are the steps for doing this with PySpark?
For this example, I'm going to use the TenantStorage DTAP that is created by default for my Tenant.
I've uploaded a dataset from https://raw.githubusercontent.com/fivethirtyeight/data/master/airline-safety/airline-safety.csv
Next, locate the controller node and ssh into it:
Because the tenant is setup with the default Cluster Superuser Privileges (Site Admin and Tenant Admin), I can download the tenant ssh key from the cluster page and use that to ssh into the controller node:
ssh [email protected] -p 10007 -i ~/Downloads/BD_Demo\ Tenant.pem
x.x.x.x
for me is the public IP address of my BlueData gateway.
Note that we are connecting to port 10007 which is the port of the controller.
Run pyspark:
$ pyspark --master yarn --deploy-mode client --packages com.databricks:spark-csv_2.10:1.4.0
Access the datafile and retrieve the first record:
>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
>>> df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('dtap://TenantStorage/airline-safety.csv')
>>> df.take(1)
The results are:
[Row(airline=u'Aer Lingus', avail_seat_km_per_week=320906734, incidents_85_99=2, fatal_accidents_85_99=0, fatalities_85_99=0, incidents_00_14=0, fatal_accidents_00_14=0, fatalities_00_14=0)]
If you want to read the data from one Datatap, process it and save it to another Datatap it would look something like this:
>>> df_filtered = df.filter(df.incidents_85_99 == 0)
>>> df_filtered.write.parquet('dtap://OtherDataTap/airline-safety_zero_incidents.parquet')