I have two Google Cloud functions, one that publishes and another that subscribes.
This is the code
public void publisXeroTeannt(XeroTenant xeroTenant) {
Gson gson = new Gson();
String xeroTenantMessage = gson.toJson(xeroTenant);
ByteString byteString = ByteString.copyFrom(xeroTenantMessage, StandardCharsets.UTF_8);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
.setData(byteString)
.build();
log.info("Data being published in string utf 8 is {}", pubsubMessage.getData().toStringUtf8());
publisher.publish(pubsubMessage);
}
When I publish, I can clearly see the log
The tenant Id being published is f837ea57-feae-4965-92c5-bc1b7cd6b2a0"
I have a subscriber
@Override
public void accept(CloudEvent event) throws Exception {
try {
String cloudEventData = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
Gson gson = new Gson();
PubsubMessage pubsubMessage = gson.fromJson(cloudEventData, PubsubMessage.class);
String encodedData = pubsubMessage.getData().toStringUtf8();
String decodedData =
new String(Base64.getDecoder().decode(encodedData), StandardCharsets.UTF_8);
log.info("Decoded message is {}", decodedData);
} catch (Exception e) {
log.error("Error in processing message", e);
}
}
But the log always says it's an empty message
INFO com.demo.SubscriberFunction - The decoded message is
What's wrong here?
Okay, the mistake was, after deserializing into PubsubMessage, I had to get the message and from message, I had to get the data as string, which then helped me translate to my object.
Here's the code change, I just used JsonELement rather than going with PubsubMessage, this proved successful.
public void accept(CloudEvent event) throws Exception {
try {
String cloudEventData = new String(event.getData().toBytes(), StandardCharsets.UTF_8);
log.info("Cloud event data is {}", cloudEventData);
Gson gson = new Gson();
JsonElement je = gson.fromJson(cloudEventData, JsonElement.class);
String tenantEncoded = je.getAsJsonObject().get("message").getAsJsonObject().get("data").getAsString();
String tenantJson = new String(Base64.getDecoder().decode(xeroTenantEncoded), StandardCharsets.UTF_8);
log.info("Xero tenant json is {}", tenantJson);
Tenant tenant = gson.fromJson(tenantJson, Tenant.class);
log.info("Decoded message is {}", xeroTenant);
if (xeroTenant != null) {
String tenantId = tenant.getTenantId();
if (StringUtils.isNotBlank(tenantId)) {
log.info("Tenant Id is {}", tenantId);
} else {
log.warn("The tenant Id is empty");
}
} else {
log.warn("Xero tenant from the topic is null");
}
} catch (Exception e) {
log.error("Error in processing message", e);
}
}