I want to subscribe to multiple Google Cloud PubSub projects in a Spring Boot application. After reading the related questions in How to wire/configure two pubsub gcp projects in one spring boot application with spring cloud?, How to use Spring Cloud GCP for multiple google projects and https://github.com/spring-cloud/spring-cloud-gcp/issues/1639 I tried it as following. However, since there is no proper documentation or sample code for this, I am not clear about how to implement this. I get the below given error which seems to be caused because credentials are not loaded.
PubSubConfig
Configurations for second PubSub project has been commented.
package com.dialog.chatboard.config;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.gcp.pubsub.core.PubSubTemplate;
import org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate;
import org.springframework.cloud.gcp.pubsub.integration.inbound.PubSubInboundChannelAdapter;
import org.springframework.cloud.gcp.pubsub.support.DefaultSubscriberFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
@Configuration
public class PubSubConfig {
DefaultSubscriberFactory genieFactory = new DefaultSubscriberFactory(() -> "XXXXX-projectId-01");
PubSubSubscriberTemplate genieSubscriberTemplate = new PubSubSubscriberTemplate(genieFactory);
// DefaultSubscriberFactory retailHubFactory = new DefaultSubscriberFactory(() -> "projectId-02");
// PubSubSubscriberTemplate retailHubSubscriberTemplate = new PubSubSubscriberTemplate(retailHubFactory);
@Bean
public MessageChannel genieInputChannel() {
return new DirectChannel();
}
@Bean
public PubSubInboundChannelAdapter genieChannelAdapter(
@Qualifier("genieInputChannel") MessageChannel inputChannel) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(genieSubscriberTemplate, "agent-genie-sub");
adapter.setOutputChannel(inputChannel);
return adapter;
}
// @Bean
// public MessageChannel retailHubInputChannel() {
// return new DirectChannel();
// }
//
// @Bean
// public PubSubInboundChannelAdapter retailHubChannelAdapter(
// @Qualifier("retailHubInputChannel") MessageChannel inputChannel) {
// PubSubInboundChannelAdapter adapter =
// new PubSubInboundChannelAdapter(retailHubSubscriberTemplate, "retail-hub-sub");
// adapter.setOutputChannel(inputChannel);
//
// return adapter;
// }
}
application.properties (For one ProjectId)
spring.cloud.gcp.project-id=XXXXX-projectId-01
spring.cloud.gcp.credentials.location=file:/home/XXXXXXXX/DialogFlow/XXXXXXXXXXXXX.json
Error
I have set GOOGLE_APPLICATION_CREDENTIALS for XXXXXXX-projectId-01 in Linux environment variable.
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'pubSubConfig' defined in file [/home/kabilesh/IdeaProjects/chatboard/target/classes/com/dialog/chatboard/config/PubSubConfig.class]: Instantiation of bean failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.dialog.chatboard.config.PubSubConfig$$EnhancerBySpringCGLIB$$8bcf7442]: Constructor threw exception; nested exception is java.lang.RuntimeException: Error creating the SubscriberStub
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateBean(AbstractAutowireCapableBeanFactory.java:1320) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1214) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:323) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:882) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878) ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550) ~[spring-context-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:747) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) [spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at com.dialog.chatboard.ChatboardApplication.main(ChatboardApplication.java:28) [classes/:na]
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.dialog.chatboard.config.PubSubConfig$$EnhancerBySpringCGLIB$$8bcf7442]: Constructor threw exception; nested exception is java.lang.RuntimeException: Error creating the SubscriberStub
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:217) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:87) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateBean(AbstractAutowireCapableBeanFactory.java:1312) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
... 17 common frames omitted
Caused by: java.lang.RuntimeException: Error creating the SubscriberStub
at org.springframework.cloud.gcp.pubsub.support.DefaultSubscriberFactory.createSubscriberStub(DefaultSubscriberFactory.java:277) ~[spring-cloud-gcp-pubsub-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at org.springframework.cloud.gcp.pubsub.core.subscriber.PubSubSubscriberTemplate.<init>(PubSubSubscriberTemplate.java:100) ~[spring-cloud-gcp-pubsub-1.2.2.RELEASE.jar:1.2.2.RELEASE]
at com.dialog.chatboard.config.PubSubConfig.<init>(PubSubConfig.java:19) ~[classes/:na]
at com.dialog.chatboard.config.PubSubConfig$$EnhancerBySpringCGLIB$$8bcf7442.<init>(<generated>) ~[classes/:na]
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:1.8.0_212]
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:1.8.0_212]
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:1.8.0_212]
at java.lang.reflect.Constructor.newInstance(Constructor.java:423) ~[na:1.8.0_212]
at org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:204) ~[spring-beans-5.2.5.RELEASE.jar:5.2.5.RELEASE]
... 19 common frames omitted
Caused by: java.io.IOException: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
at com.google.auth.oauth2.DefaultCredentialsProvider.getDefaultCredentials(DefaultCredentialsProvider.java:134) ~[google-auth-library-oauth2-http-0.20.0.jar:na]
at com.google.auth.oauth2.GoogleCredentials.getApplicationDefault(GoogleCredentials.java:119) ~[google-auth-library-oauth2-http-0.20.0.jar:na]
at com.google.auth.oauth2.GoogleCredentials.getApplicationDefault(GoogleCredentials.java:91) ~[google-auth-library-oauth2-http-0.20.0.jar:na]
at com.google.api.gax.core.GoogleCredentialsProvider.getCredentials(GoogleCredentialsProvider.java:67) ~[gax-1.54.0.jar:1.54.0]
at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:135) ~[gax-1.54.0.jar:1.54.0]
at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:263) ~[google-cloud-pubsub-1.103.0.jar:1.103.0]
at org.springframework.cloud.gcp.pubsub.support.DefaultSubscriberFactory.createSubscriberStub(DefaultSubscriberFactory.java:274) ~[spring-cloud-gcp-pubsub-1.2.2.RELEASE.jar:1.2.2.RELEASE]
... 27 common frames omitted
Disconnected from the target VM, address: '127.0.0.1:34223', transport: 'socket'
Process finished with exit code 1
In order to do that you need
first of all turn off GCP autoconfiguration for pubsub
@SpringBootApplication(exclude = {
GcpPubSubAutoConfiguration.class,
GcpPubSubReactiveAutoConfiguration.class
})
public class PubsubApplication {
public static void main(String[] args) {
SpringApplication.run(PubsubApplication.class, args);
}
}
then create config for first project
@Configuration
public class Project1Config {
private static final Logger LOGGER = LoggerFactory.getLogger(Project1Config.class);
@Bean(name = "project1_IdProvider")
public GcpProjectIdProvider project1_IdProvider() {
return new DefaultGcpProjectIdProvider() {
@Override
public String getProjectId() {
return "YOURPROJECTID";
}
};
}
@Bean(name = "project1_credentialsProvider")
public CredentialsProvider project1_credentialsProvider() throws IOException {
return new CredentialsProvider() {
@Override
public Credentials getCredentials() throws IOException {
return ServiceAccountCredentials.fromStream(
new ClassPathResource("YOURCREDENTIALS").getInputStream());
}
};
}
@Bean("project1_pubSubSubscriberTemplate")
public PubSubSubscriberTemplate pubSubSubscriberTemplate(
@Qualifier("project1_subscriberFactory") SubscriberFactory subscriberFactory) {
return new PubSubSubscriberTemplate(subscriberFactory);
}
@Bean("project1_publisherFactory")
public DefaultPublisherFactory publisherFactory(
@Qualifier("project1_IdProvider") GcpProjectIdProvider projectIdProvider,
@Qualifier("project1_credentialsProvider") CredentialsProvider credentialsProvider) {
final DefaultPublisherFactory defaultPublisherFactory = new DefaultPublisherFactory(projectIdProvider);
defaultPublisherFactory.setCredentialsProvider(credentialsProvider);
return defaultPublisherFactory;
}
@Bean("project1_subscriberFactory")
public DefaultSubscriberFactory subscriberFactory(
@Qualifier("project1_IdProvider") GcpProjectIdProvider projectIdProvider,
@Qualifier("project1_credentialsProvider") CredentialsProvider credentialsProvider) {
final DefaultSubscriberFactory defaultSubscriberFactory = new DefaultSubscriberFactory(projectIdProvider);
defaultSubscriberFactory.setCredentialsProvider(credentialsProvider);
return defaultSubscriberFactory;
}
@Bean(name = "project1_pubsubInputChannel")
public MessageChannel pubsubInputChannel() {
return new DirectChannel();
}
@Bean(name = "project1_pubSubTemplate")
public PubSubTemplate project1_PubSubTemplate(
@Qualifier("project1_publisherFactory") PublisherFactory publisherFactory,
@Qualifier("project1_subscriberFactory") SubscriberFactory subscriberFactory,
@Qualifier("project1_credentialsProvider") CredentialsProvider credentialsProvider) {
if (publisherFactory instanceof DefaultPublisherFactory) {
((DefaultPublisherFactory) publisherFactory).setCredentialsProvider(credentialsProvider);
}
return new PubSubTemplate(publisherFactory, subscriberFactory);
}
@Bean(name = "project1_messageChannelAdapter")
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("project1_pubsubInputChannel") MessageChannel inputChannel,
@Qualifier("project1_pubSubTemplate") PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubSubTemplate, "YOURSUBSCRIPTIONNAME");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
@Bean("project1_messageReceiver")
@ServiceActivator(inputChannel = "project1_pubsubInputChannel")
public MessageHandler messageReceiver() {
return message -> {
LOGGER.info("Message arrived! Payload: " + new String((byte[]) message.getPayload()));
LOGGER.info("Message headers {}", message.getHeaders());
BasicAcknowledgeablePubsubMessage originalMessage =
message
.getHeaders()
.get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
originalMessage.ack();
};
}
@Bean("project1_messageSender")
@ServiceActivator(inputChannel = "project1_pubsubOutputChannel")
public MessageHandler messageSender(
@Qualifier("project1_pubSubTemplate") PubSubTemplate pubsubTemplate) {
return new PubSubMessageHandler(pubsubTemplate, "YOURTOPICNAME");
}
}
Next - create config for project2
@Configuration
public class Project2Config {
private static final Logger LOGGER = LoggerFactory.getLogger(Project2Config.class);
@Bean(name = "project2_IdProvider")
public DefaultGcpProjectIdProvider project2_IdProvider() {
return new DefaultGcpProjectIdProvider() {
@Override
public String getProjectId() {
return "project-id-lksjfkalsdjfkl";
}
};
}
@Bean(name = "project2_credentialsProvider")
public CredentialsProvider project2_credentialsProvider() throws IOException {
return new CredentialsProvider() {
@Override
public Credentials getCredentials() throws IOException {
return ServiceAccountCredentials.fromStream(
new ClassPathResource("project2.json").getInputStream());
}
};
}
@Bean("project2_pubSubSubscriberTemplate")
public PubSubSubscriberTemplate pubSubSubscriberTemplate(
@Qualifier("project2_subscriberFactory") SubscriberFactory subscriberFactory) {
return new PubSubSubscriberTemplate(subscriberFactory);
}
@Bean("project2_publisherFactory")
public DefaultPublisherFactory publisherFactory(
@Qualifier("project2_IdProvider") GcpProjectIdProvider projectIdProvider,
@Qualifier("project2_credentialsProvider") CredentialsProvider credentialsProvider) {
final DefaultPublisherFactory defaultPublisherFactory = new DefaultPublisherFactory(projectIdProvider);
defaultPublisherFactory.setCredentialsProvider(credentialsProvider);
return defaultPublisherFactory;
}
@Bean("project2_subscriberFactory")
public DefaultSubscriberFactory subscriberFactory(
@Qualifier("project2_IdProvider") GcpProjectIdProvider projectIdProvider,
@Qualifier("project2_credentialsProvider") CredentialsProvider credentialsProvider) {
final DefaultSubscriberFactory defaultSubscriberFactory = new DefaultSubscriberFactory(projectIdProvider);
defaultSubscriberFactory.setCredentialsProvider(credentialsProvider);
return defaultSubscriberFactory;
}
@Bean(name = "project2_pubsubInputChannel")
public MessageChannel pubsubInputChannel() {
return new DirectChannel();
}
@Bean(name = "project2_pubSubTemplate")
public PubSubTemplate project2_PubSubTemplate(
@Qualifier("project2_publisherFactory") PublisherFactory publisherFactory,
@Qualifier("project2_subscriberFactory") SubscriberFactory subscriberFactory,
@Qualifier("project2_credentialsProvider") CredentialsProvider credentialsProvider) {
if (publisherFactory instanceof DefaultPublisherFactory) {
((DefaultPublisherFactory) publisherFactory).setCredentialsProvider(credentialsProvider);
}
return new PubSubTemplate(publisherFactory, subscriberFactory);
}
@Bean(name = "project2_messageChannelAdapter")
public PubSubInboundChannelAdapter messageChannelAdapter(
@Qualifier("project2_pubsubInputChannel") MessageChannel inputChannel,
@Qualifier("project2_pubSubTemplate") PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter =
new PubSubInboundChannelAdapter(pubSubTemplate, "project2-testSubscription");
adapter.setOutputChannel(inputChannel);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
@Bean("project2_messageReceiver")
@ServiceActivator(inputChannel = "project2_pubsubInputChannel")
public MessageHandler messageReceiver() {
return message -> {
LOGGER.info("Message Payload: " + new String((byte[]) message.getPayload()));
LOGGER.info("Message headers {}", message.getHeaders());
BasicAcknowledgeablePubsubMessage originalMessage =
message
.getHeaders()
.get(GcpPubSubHeaders.ORIGINAL_MESSAGE, BasicAcknowledgeablePubsubMessage.class);
originalMessage.ack();
};
}
@Bean(name = "project2_messageSender")
@ServiceActivator(inputChannel = "project2_pubsubOutputChannel")
public MessageHandler messageSender(
@Qualifier("project2_pubSubTemplate") PubSubTemplate pubsubTemplate) {
return new PubSubMessageHandler(pubsubTemplate, "project2-testTopic");
}
}
Create outbound gateway for project 1
project1_pubsubOutputChannel - specified in Project1Config
@Service
@MessagingGateway(defaultRequestChannel = "project1_pubsubOutputChannel")
public interface Project1PubsubOutboundGateway {
void sendToPubsub(String text);
}
Create outbound gateway for project 2
project2_pubsubOutputChannel - specified in Project2Config
@Service
@MessagingGateway(defaultRequestChannel = "project2_pubsubOutputChannel")
public interface Project2PubsubOutboundGateway {
void sendToPubsub(String text);
}
Now we are successfull:
@RestController
public class WebAppController {
// tag::autowireGateway[]
@Autowired private Project1PubsubOutboundGateway project1PubsubOutboundGateway;
@Autowired private Project2PubsubOutboundGateway project2PubsubOutboundGateway;
// end::autowireGateway[]
@PostMapping("/publishMessage")
public ResponseEntity<String> publishMessage(@RequestParam("message") String message) {
project1PubsubOutboundGateway.sendToPubsub(message);
project2PubsubOutboundGateway.sendToPubsub(message);
return ResponseEntity.ok("OK");
}
}
Check logs to see messaging is working
checkout git project for more details: https://github.com/olgmaks/spring-gcppubsub-multiproject