I'm trying to migrate mysql table data into cassandra using nifi. Attaching screenshot of what I have tried in nifi as I stuck at putCassandraQl command as it is throwing the error which is mentioned in attached screenshot. Please help me on this as i need to add more steps.
PutCassandraQL requires the flow file to contain a CQL statement, where you are passing in the Avro records you're getting from QueryDatabaseTable, or after trying to modify the contents with ReplaceText.
Try the following:
QDT -> SplitAvro -> ConvertAvroToJSON -> ConvertJSONToSQL -> ReplaceText (to change SQL to CQL) -> PutCassandraQL
EDIT: PutCassandraQL is expecting parameters in attributes of the form cql.args.N.type
and cql.args.N.value
where N is a positive integer corresponding to the position of the value to be inserted. However ConvertJSONToSQL outputs attributes of the form sql.args.N.type
and sql.args.N.value
. This means you'll need to change them using UpdateAttribute or ExecuteScript. As of NiFi 1.5.0 (not yet released at the time of this writing) via NIFI-4684, you'll be able to specify the prefix of the attributes coming out of ConvertJSONToSQL, so you can set that property to cql
.
You may not need a ReplaceText if what is generated by ConvertJSONToSQL is valid CQL. If it is not, you will need to use ReplaceText to change the SQL statement into valid CQL.