Search code examples
javagoogle-cloud-platformgoogle-cloud-pubsub

GCP pub/sub, my pulled message is always empty


I have two Google Cloud functions, one that publishes and another that subscribes.

This is the code

    public void publisXeroTeannt(XeroTenant xeroTenant) {
        Gson gson = new Gson();
        String xeroTenantMessage = gson.toJson(xeroTenant);
        ByteString byteString = ByteString.copyFrom(xeroTenantMessage, StandardCharsets.UTF_8);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
                .setData(byteString)
                .build();
        log.info("Data being published in string utf 8 is {}", pubsubMessage.getData().toStringUtf8());

        publisher.publish(pubsubMessage);
    }

When I publish, I can clearly see the log The tenant Id being published is f837ea57-feae-4965-92c5-bc1b7cd6b2a0"

I have a subscriber

@Override
    public void accept(CloudEvent event) throws Exception {
        try {
            String cloudEventData = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
            Gson gson = new Gson();
            PubsubMessage pubsubMessage = gson.fromJson(cloudEventData, PubsubMessage.class);
            String encodedData = pubsubMessage.getData().toStringUtf8();
            String decodedData =
                    new String(Base64.getDecoder().decode(encodedData), StandardCharsets.UTF_8);
            log.info("Decoded message is {}", decodedData);


        } catch (Exception e) {
            log.error("Error in processing message", e);
        }
    }

But the log always says it's an empty message INFO com.demo.SubscriberFunction - The decoded message is

What's wrong here?


Solution

  • Okay, the mistake was, after deserializing into PubsubMessage, I had to get the message and from message, I had to get the data as string, which then helped me translate to my object.

    Here's the code change, I just used JsonELement rather than going with PubsubMessage, this proved successful.

    public void accept(CloudEvent event) throws Exception {
            try {
                String cloudEventData = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
                log.info("Cloud event data is {}", cloudEventData);
    
                Gson gson = new Gson();
    
                JsonElement je = gson.fromJson(cloudEventData, JsonElement.class);
                String tenantEncoded = je.getAsJsonObject().get("message").getAsJsonObject().get("data").getAsString();
                String tenantJson = new String(Base64.getDecoder().decode(xeroTenantEncoded), StandardCharsets.UTF_8);
                log.info("Xero tenant json is {}", tenantJson);
                Tenant tenant = gson.fromJson(tenantJson, Tenant.class);
    
                log.info("Decoded message is {}", xeroTenant);
                if (xeroTenant != null) {
                    String tenantId = tenant.getTenantId();
                    if (StringUtils.isNotBlank(tenantId)) {
                        log.info("Tenant Id is {}", tenantId);
                    } else {
                        log.warn("The tenant Id is empty");
                    }
                } else {
                    log.warn("Xero tenant from the topic is null");
                }
            } catch (Exception e) {
                log.error("Error in processing message", e);
            }
        }