Search code examples
google-cloud-platformiotgoogle-cloud-pubsubandroid-thingstelemetry

How can I use IoT telemetry events on Pub/Sub topic?


I had read documentation about Google cloud IoT API. And I had wrote simple application for Android of things. Based on this google's library. My application is successfully connected to IoT platform and I had sent test data.

My application code.

ConnectionParams connectionParams = new ConnectionParams.Builder()
            .setProjectId("my_pid")
            .setRegistry("my_reg", "my_server")
            .setDeviceId("my_device_name")
            .build();
IotCoreClient client = new IotCoreClient.Builder()
            .setConnectionParams(connectionParams)
            .setKeyPair(keys)
            .setTelemetryQueue(new LinkedTransferQueue<TelemetryEvent>())
            .build();

client.connect();

client.publishDeviceState("Test data\n".getBytes());

client.publishTelemetry(new TelemetryEvent("Sonata".getBytes(), null,TelemetryEvent.QOS_AT_LEAST_ONCE));

But there is method for sending device sensors data to IoT platform ("publishTelemetry(Parms...)).

client.publishTelemetry(new TelemetryEvent("Sonata".getBytes(), null,TelemetryEvent.QOS_AT_LEAST_ONCE));

This code is works, but I can't find this data "Sonata" in google cloud platform, and I can't understand how can I use telemetry events on Pub/Sub topic?

Updated

I found the solution. For first step I had added subscriber to topic. Example. Topic projects/my-project-id/topics/firstTop Example Subscriber (fsub is subscriber name) projects/my-project-id/subscriptions/fsub And I had wrote simple subscriber code in java, and sent message from Android of things device. And I had get telemetry data.

This is subscriber code in java

import com.google.api.gax.core.CredentialsProvider;
import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.collect.Lists;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

public class SubscriberExample {
    private static final String PROJECT_ID = "my-project-id";
    private static final BlockingQueue<PubsubMessage> messages = new LinkedBlockingDeque<>();

    static class MessageReceiverExample implements MessageReceiver {
        @Override
        public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            messages.offer(message);
            consumer.ack();
        }
    }

    public static void main(String... args) throws Exception {
        String subscriptionId = "YOUR_SUBSCRIBER_ID";
        ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(PROJECT_ID, subscriptionId);
        Subscriber subscriber = null;
        try {
            GoogleCredentials credentials = GoogleCredentials
                    .fromStream(new FileInputStream("~/google_cloud_pubsub-Project-0b66ab8c5060.json")) // you can get here https://cloud.google.com/docs/authentication/getting-started
                    .createScoped(Lists.newArrayList("https://www.googleapis.com/auth/cloud-platform"));
            subscriber = Subscriber.newBuilder(subscriptionName, new MessageReceiverExample())
                    .setCredentialsProvider(new CredentialsProvider() {
                        @Override
                        public Credentials getCredentials() throws IOException {
                            return credentials;
                        }
                    }).build();
            subscriber.startAsync().awaitRunning();
            while (true) {
                PubsubMessage message = messages.take();
                System.out.println("Message Id: " + message.getMessageId());
                System.out.println("Data: " + message.getData().toStringUtf8());
            }
        } finally {
            if (subscriber != null) {
                subscriber.stopAsync();
            }
        }
    }
}

Solution

  • First to check that it's working and the connection really is going to your project, the easiest way is to look at the Google Cloud Platform console for your project, and on the device page inside the IoT Core section (IoT Core->registry->device), there's a tab for "Configuration & state history" which, you should see "Test data" there (as set with the publishDeviceState call). That should confirm that at least everything else is working as expected.

    Assuming that's the case, now you'll want to look at Pub/Sub documentation to start to get a handle on what you can do with Pub/Sub. Here is the main doc page. My recommendation is to look into Google Cloud Functions as a place to get up and running quickly. Depending on what you want to do, Cloud Dataflow might also be a good option to look into.

    Each of those products triggers based off messages getting published into Cloud Pub/Sub. So as soon as you're calling "publishTelemetry", it sends the telemetry to IoT Core, which then publishes the message into the Pub/Sub topic specified in the IoT Core registry when you created it. Then the triggered products (GCF and Dataflow) receive the Pub/Sub object which you can get the telemetry data from. There's examples in the docs on how to do it.