Search code examples
cassandraapache-nifikylo

Not able to transfer mysql table data to cassandra using Nifi


I'm trying to migrate mysql table data into cassandra using nifi. Attaching screenshot of what I have tried in nifi as I stuck at putCassandraQl command as it is throwing the error which is mentioned in attached screenshot. Please help me on this as i need to add more steps.

enter image description here


Solution

  • PutCassandraQL requires the flow file to contain a CQL statement, where you are passing in the Avro records you're getting from QueryDatabaseTable, or after trying to modify the contents with ReplaceText.

    Try the following:

    QDT -> SplitAvro -> ConvertAvroToJSON -> ConvertJSONToSQL -> ReplaceText (to change SQL to CQL) -> PutCassandraQL

    EDIT: PutCassandraQL is expecting parameters in attributes of the form cql.args.N.type and cql.args.N.value where N is a positive integer corresponding to the position of the value to be inserted. However ConvertJSONToSQL outputs attributes of the form sql.args.N.type and sql.args.N.value. This means you'll need to change them using UpdateAttribute or ExecuteScript. As of NiFi 1.5.0 (not yet released at the time of this writing) via NIFI-4684, you'll be able to specify the prefix of the attributes coming out of ConvertJSONToSQL, so you can set that property to cql.

    You may not need a ReplaceText if what is generated by ConvertJSONToSQL is valid CQL. If it is not, you will need to use ReplaceText to change the SQL statement into valid CQL.