I'm just querying cassandra table using querycassandra processor but what I'm not understanding is how do I pass my Json output file into ExecutePyspark processor as a Input file and later on I need to pass my Spark output data to Hive. Please help me on this, Thanks.
My Query Cassandra Properties:
Consider this flow that uses 4 processors as below:
QueryCassandra -> UpdateAttribute -> PutFile -> ExecutePySpark
Step 1: QueryCassandra
processor: Execute a CQL on Cassandra and output the result in a flow file.
Step 2: UpdateAttribute
processor: Assign the property filename
a value containing name for a temporary file on disk that will contain the query results. Use NiFi expression language for generating the file name so that it will be different for each run. Create a property result_directory
and assign a value for a folder on disk that NiFi has write permissions to.
filename
value: cassandra_result_${now():toNumber()}
property: result_directory
/tmp
Step 3: PutFile
processor: Configure the Directory
property with the value ${result_directory}
populated in Step 2.
Step 4: ExecutePySpark
processor: Pass the filename with its location as an argument to the PySpark application via the PySpark App Args
processor property. The application can then have code to read data from the file on disk, process it and write to Hive.
PySpark App Args
${result_directory}/${filename}
Additionally, you could configure more attributes in Step 2 (UpdateAttribute) that could be then passed as arguments in Step 4 (ExecutePySpark) and considered by the PySpark application in writing to Hive (for example, the Hive database and table name).