Search code examples
apache-nifi

Reading AMQP routing key with Apache Nifi


I am trying to read off a RabbitMQ broker using apache nifi. It is important that I can retrieve the routing key associated with the message and write the payload and routing key to a file.

I've used python to read the routing key so I'm positive that it is present.

I'm using the ConsumeAMQP processor which links to a PutFile processor. All that gets written is the payload, not the routing key.


Solution

  • The ConsumeAMQP processor parses the incoming message and forms it into an Apache NiFi flowfile. The flowfile structure includes a key/value pair list called attributes and the arbitrary bytes of the content. According to the "Additional Details" section of the ConsumeAMQP documentation:

    This processor does two things. It constructs FlowFile by extracting information from the consumed AMQP message (both body and attributes). Once message is consumed a FlowFile is constructed. The message body is written to a FlowFile and its com.rabbitmq.client.AMQP.BasicProperties are transfered into the FlowFile as attributes. AMQP attribute names are prefixed with amqp$ prefix.

    AMQP Properties The following is the list of available standard AMQP properties which may come with the message: ("amqp$contentType", "amqp$contentEncoding", "amqp$headers", "amqp$deliveryMode", "amqp$priority", "amqp$correlationId", "amqp$replyTo", "amqp$expiration", "amqp$messageId", "amqp$timestamp", "amqp$type", "amqp$userId", "amqp$appId", "amqp$clusterId")

    If the attribute you are looking for is not contained here (and unless the naming convention is unusual, it does not appear to be), you should check to see if it is contained in com.rabbitmq.client.AMQP.BasicProperties (it does not appear to be). I am not RabbitMQ expert, but from this link it looks like the routing key is some attribute on a message that an exchange registers in order to route the incoming messages. See also: RabbitMQ "AMQP 0-9-1 Model Explained"

    I would check the amqp$headers attribute on your incoming messages (you can pause the consuming PutFile processor to queue up these flowfiles in the connection and examine them in realtime) to see if you can extract the routing key using NiFi Expression Language. An UpdateAttribute processor with a dynamic property routingKey and an expression like (not tested) ${amqp$headers.routing_key} would result in a new flowfile attribute called routingKey with the value you're looking for. If it is not present there, it would be on the com.rabbitmq.client.Envelope object, accessible via Envelope.getRoutingKey(), but I do not believe NiFi exposes this object to the processor at this time. A change would need to be made in ConsumeAMQP.java @ L101. You can file a feature request via Jira.

    Another thing to keep in mind is that even if you extract the routing key to an attribute, PutFile very clearly documents that it only prints the flowfile content to the file, not the attributes. If you need to modify the content of the flowfile to contain attributes, use a ReplaceText processor to insert the attributes in a map structure or some other desired format.