Search code examples
apache-sparkcassandrapysparkapache-nifikylo

How do I transfer my cassandra data to pyspark using QueryCassandra and ExecutePySpark Nifi Processors?


I'm just querying cassandra table using querycassandra processor but what I'm not understanding is how do I pass my Json output file into ExecutePyspark processor as a Input file and later on I need to pass my Spark output data to Hive. Please help me on this, Thanks.

My Query Cassandra Properties:

enter image description here

Pyspark Properties: enter image description here


Solution

  • Consider this flow that uses 4 processors as below:

    QueryCassandra -> UpdateAttribute -> PutFile -> ExecutePySpark

    Step 1: QueryCassandra processor: Execute a CQL on Cassandra and output the result in a flow file.

    Step 2: UpdateAttribute processor: Assign the property filename a value containing name for a temporary file on disk that will contain the query results. Use NiFi expression language for generating the file name so that it will be different for each run. Create a property result_directory and assign a value for a folder on disk that NiFi has write permissions to.

    • property: filename
    • value: cassandra_result_${now():toNumber()}

    • property: result_directory

    • value: /tmp

    enter image description here

    Step 3: PutFile processor: Configure the Directory property with the value ${result_directory} populated in Step 2.

    enter image description here

    Step 4: ExecutePySpark processor: Pass the filename with its location as an argument to the PySpark application via the PySpark App Args processor property. The application can then have code to read data from the file on disk, process it and write to Hive.

    • property: PySpark App Args
    • value: ${result_directory}/${filename}

    enter image description here

    Additionally, you could configure more attributes in Step 2 (UpdateAttribute) that could be then passed as arguments in Step 4 (ExecutePySpark) and considered by the PySpark application in writing to Hive (for example, the Hive database and table name).