Have a typical Springboot web application (simplying the names) with HelloController, HelloService, and HelloRepository. The HelloRepository is using JPA to fetch data from CloudSQL (Postgres). And this application is deployed to CloudRun and its working as expected.
@Repository
public interface HelloRepository extends JpaRepository<Hello, UUID> {
}
Trying to consume messages from Pub/Sub Topic using pull subscription, and process it in the same application.
Have the following Utility to help creation of subscription,
@Scope("prototype")
public class PubSubSubscriber implements IPubSubScriber, DisposableBean {
private Subscriber subscriber;
private IPubSubMessageSubscriber pubSubMessageSubscriber;
final MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
if (StringUtils.isBlank(message.getData().toStringUtf8())) {
logger.severe("Empty message cannot be processed ");
consumer.ack();
return;
}
// logger.info("Received message, message Id: " + message.getMessageId());
logger.info("Received message, messageId:" + message.getMessageId() + " ,message: " + message.getData().toStringUtf8());
try {
pubSubMessageSubscriber.onPubSubMessage(message.getMessageId(), message.getData().toStringUtf8());
consumer.ack();
logger.info("Processed message, messageId:" + message.getMessageId());
} catch (Exception e) {
throw new RuntimeException(e);
}
};
@Override
public void runAsyncSubscriberService(final String projectId, final String subscriptionId, IPubSubMessageSubscriber pubSubMessageSubscriber) {
try {
this.pubSubMessageSubscriber = pubSubMessageSubscriber;
final ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);
subscriber =
Subscriber.newBuilder(subscriptionName, receiver)
//.setMaxAckExtensionPeriod(Duration.ofSeconds(600))
// .setMinDurationPerAckExtension(Duration.ofSeconds(10))
//.setParallelPullCount(1)
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1L)
.build())
.build();
subscriber.startAsync().awaitRunning();
logger.info("Listening for pub/sub messages on subscription: " + subscriptionName);
// subscriber.awaitTerminated();
// logger.info("logger - stopped consumer");
} catch (final Exception exception) {
logger.severe("Exception occurred while processing message --> " + exception.getMessage() +
" cause --> " + exception.getCause());
subscriber.stopAsync();
}
}
@Override
public void destroy() {
logger.info("PubSub Consumer Stopped");
if (!Objects.isNull(subscriber)) {
subscriber.stopAsync();
logger.info("Stopped PubSubConsumer subscriber");
} else {
logger.info("Stop command is ignored - PubSubConsumer's subscriber is not running");
}
}
}
public interface IPubSubMessageSubscriber {
void onPubSubMessage(String messageId, final String pubSubMessage) throws Exception;
}
public interface IPubSubConsumer {
void runAsyncSubscriberService(String projectId, String subscriptionId, IPubSubMessageSubscriber pubSubMessageSubscriber);
}
HelloTopicSubscriber, a service to consume the messages,
@Service
public class HelloTopicSubscriber implements IPubSubMessageSubscriber {
private final IPubSubScriber pubSubScriber;
public HelloTopicSubscriber(
IPubSubScriber pubSubScriber) {
this.pubSubScriber = pubSubScriber;
...
start();
}
public void startStructures() {
pubSubConsumer.runAsyncSubscriberService(
gcpServiceOptions.getDefaultProjectId(), "HelloSub1", this);
}
@Override
public void onPubSubMessage(String messageId, String message) {
// Process the message, get data from database etc.
}
With the above setup, I am able to read incoming messages and ack them properly.
As I need to fetch messages from Hello table, If I inject HelloService to HelloTopicSubscriber, the application becomes unstable and getting jdbc connection issues of various types, wondering if its because of any thread violations due to the subscribers async nature, i.e.
public HelloTopicSubscriber(
IPubSubScriber pubSubScriber,
HelloService helloService **// <-- will this cause any issue?**) {
this.pubSubScriber= pubSubScriber;
@Override
public void onPubSubMessage(String messageId, String message) {
// Process the message, get data from database etc.
helloService.get(..)
}
In the iterim, I am using WebClient to call the HelloController to help with the processing.
Also I am using @scope to help create multiple subscriber to fetch data from different topics, do you guys see any issue with this approach?
Some of the JDBC exceptions are, Unable to acquire JDBC Connection error, Failed to update metadata for Cloud SQL Instance and certificate error, SQL Exception with Error 0, java.net.SocketException: Broken pipe at.
We were running the above service from GCP CloudRun service and most of these issues got resolved after we removed cpu throttling.