Search code examples
javaazureazure-logic-appsazure-iot-hubazure-iot-sdk

Azure IoTHub DeviceMessage, route filter on message body not working


I am creating IoT remote monitoring and notifications with Azure Logic Apps as per the instructions stated in https://learn.microsoft.com/en-us/azure/iot-hub/iot-hub-monitoring-notifications-with-azure-logic-apps.

Telemetry Simulator (Java using com.microsoft.azure.sdk.iot -> iot-device-client -> 1.14.0 version)

public class SimulatedDevice {
    // The device connection string to authenticate the device with your IoT hub.
    // Using the Azure CLI:
    // az iot hub device-identity show-connection-string --hub-name {YourIoTHubName}
    // --device-id MyJavaDevice --output table
    private static String connString = "#ConnectionString";

    private static IotHubClientProtocol protocol = IotHubClientProtocol.AMQPS;
    private static DeviceClient client;

    // Specify the telemetry to send to your IoT hub.
    private static class TelemetryDataPoint {
        public double temperature;
        public double humidity;
        public String isTrue = "true";

        // Serialize object to JSON format.
        public String serialize() {
            Gson gson = new Gson();
            return gson.toJson(this);
        }
    }

    // Print the acknowledgement received from IoT Hub for the telemetry message
    // sent.
    private static class EventCallback implements IotHubEventCallback {
        public void execute(IotHubStatusCode status, Object context) {
            System.out.println("IoT Hub responded to message with status: " + status.name());

            if (context != null) {
                synchronized (context) {
                    context.notify();
                }
            }
        }
    }

    private static class MessageSender implements Runnable {
        public void run() {
            try {
                // Initialize the simulated telemetry.
                double minTemperature = 20;
                double minHumidity = 60;
                Random rand = new Random();
                int i = 0;

                while (i < 100000) {
                    // Simulate telemetry.
                    double currentTemperature = minTemperature + rand.nextDouble() * 15;
                    double currentHumidity = minHumidity + rand.nextDouble() * 20;
                    TelemetryDataPoint telemetryDataPoint = new TelemetryDataPoint();
                    telemetryDataPoint.temperature = currentTemperature;
                    telemetryDataPoint.humidity = currentHumidity;

                    // Add the telemetry to the message body as JSON.
                    String msgStr = telemetryDataPoint.serialize();

                    byte[] bodyClone = msgStr.getBytes(StandardCharsets.UTF_8);
                    Message msg = new Message(bodyClone);

                    // Add a custom application property to the message.
                    // An IoT hub can filter on these properties without access to the message body.
                    msg.setProperty("temperatureAlert", (currentTemperature > 30) ? "true" : "false");
                    msg.setMessageType(MessageType.DEVICE_TELEMETRY);

                    System.out.println("Sending message string: " + msgStr);
                    System.out.println("Sending message: " + msg);

                    Object lockobj = new Object();

                    // Send the message.
                    EventCallback callback = new EventCallback();
                    client.sendEventAsync(msg, callback, lockobj);

                    synchronized (lockobj) {
                        lockobj.wait();
                    }
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                System.out.println("Finished.");
            }
        }
    }

    public static void main(String[] args) throws IOException, URISyntaxException {

        // Connect to the IoT hub.
        client = new DeviceClient(connString, protocol);
        client.open();

        // Create new thread and start sending messages
        MessageSender sender = new MessageSender();
        ExecutorService executor = Executors.newFixedThreadPool(1);
        executor.execute(sender);

        // Stop the application.
        System.out.println("Press ENTER to exit.");
        System.in.read();
        executor.shutdownNow();
        client.closeNow();
    }
}

For QueryString - temperatureAlert = "true" - everything is working fine. But for query string - $body.temperature > 30 - then I don't receive any messages.


Solution

  • In order for IoT Hub to know whether the message can be routed based on its body contents, the message must contain specific headers which describe the content and encoding of its body. In particular, messages must have both these headers for routing on message body to work:

    1. Content type of "application/json"
    2. Content encoding must match one of:
      • "utf-8"
      • "utf-16"
      • "utf-32"

    Here add below two lines just below the syntax to create Message object:

    msg.setContentEncoding("utf-8");
    msg.setContentType("application/json");