Search code examples
springspring-data-couchbase

Spring Batch with 2 Couchbase clusters


I'm trying to create a Spring Batch in order to copy Couchbase data from one cluster to another. I'm using Spring Data Couchbase.

But I can't figure out how to have 2 different repositories on 2 different clusters.

I've found many examples on how to have multiple buckets on the same cluster based on OperationsMapping but it does'nt meet my needs because in my case the Data Model is the same.

When I use the second repository (the target where the bucket is empty) to test if the document from the source repository exists, I get true. So it seems the test is made in the same repository (the source).

I tried to declare custom beans with custom names for CouchbaseTemplate, CouchbaseClientFactory, CouchbaseClusterEnvironment and CouchbaseCluster with no luck.

Thanks for help !

Here is the example of one of my 2 configuration classes (the second is the same with 'source' replaced by 'target') :

@Configuration
@EnableCouchbaseRepositories(repositoryBaseClass = SourceDocumentRepository.class, couchbaseTemplateRef = "sourceCouchbaseTemplate")
public class SourceCouchbaseConfiguration extends AbstractCouchbaseConfiguration {

  ... getters overriden : connectionString, user, pwd, bucket, scope

  @Bean(name = "sourceCouchbaseCluster", destroyMethod = "disconnect")
  public Cluster sourceCouchbaseCluster(ClusterEnvironment sourceCouchbaseClusterEnvironment) {
    return Cluster.connect(getConnectionString(), ClusterOptions.clusterOptions(authenticator()).environment(sourceCouchbaseClusterEnvironment));
  }

  @Bean(name = "sourceCouchbaseClusterEnvironment", destroyMethod = "shutdown")
  public ClusterEnvironment sourceCouchbaseClusterEnvironment() {
    ClusterEnvironment.Builder builder = ClusterEnvironment.builder();
    builder.jsonSerializer(JacksonJsonSerializer.create(couchbaseObjectMapper()));
    configureEnvironment(builder);
    return builder.build();
  }
  
  @Bean("sourceCouchbaseClientFactory")
  public CouchbaseClientFactory sourceCouchbaseClientFactory() {
    return new SimpleCouchbaseClientFactory(getConnectionString(), authenticator(), getBucketName(), getScopeName());
  }

  @Bean("sourceCouchbaseTemplate")
  public CouchbaseTemplate sourceCouchbaseTemplate() {
    return new CouchbaseTemplate(sourceCouchbaseClientFactory(), new MappingCouchbaseConverter());
  }

  @Bean("sourceReactiveCouchbaseTemplate")
  public ReactiveCouchbaseTemplate sourceReactiveCouchbaseTemplate() {
    return new ReactiveCouchbaseTemplate(sourceCouchbaseClientFactory(), new MappingCouchbaseConverter());
  }
  
  @Bean("sourceCouchbaseRepositoryOperationsMapping")
  public RepositoryOperationsMapping sourceCouchbaseRepositoryOperationsMapping(CouchbaseTemplate sourceCouchbaseTemplate) {
    return new RepositoryOperationsMapping(sourceCouchbaseTemplate);
  }
}

Solution

  • I ended up creating one repository for my source based on AbstractCouchbaseConfiguration with my custom findBy with pagination.

    And for the target, I created a simple configuration class with custom names for the beans :

    @Configuration
    public class TargetCouchbaseConfiguration {
      
      @Autowired
      private CopyCouchbaseDataProperties properties;
    
      @Autowired
      public MappingCouchbaseConverter mappingCouchbaseConverter;
      
      private Authenticator authenticator() {
        return PasswordAuthenticator.create(properties.getTarget().getUsername(), properties.getTarget().getPassword());
      }
    
      public ObjectMapper couchbaseObjectMapper() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.configure(com.fasterxml.jackson.databind.DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        mapper.registerModule(new JsonValueModule());
        return mapper;
      }
    
      @Bean(name = "targetCouchbaseClusterEnvironment", destroyMethod = "shutdown")
      public ClusterEnvironment targetCouchbaseClusterEnvironment() {
        ClusterEnvironment.Builder builder = ClusterEnvironment.builder();
        builder.jsonSerializer(JacksonJsonSerializer.create(couchbaseObjectMapper()));
    
        if (StringUtils.isNotEmpty(properties.getKeyStoreLocation())) {
          SecurityConfig.Builder securityConfigBuilder = SecurityConfig.builder()
              .enableHostnameVerification(false)
              .enableTls(true)
              .trustStore(Paths.get(properties.getKeyStoreLocation()), null, Optional.of(Constants.JKS_TRUSTSTORE_TYPE));
      
          builder.ioConfig(IoConfig.networkResolution(NetworkResolution.EXTERNAL))
            .securityConfig(securityConfigBuilder);
        }
        
        return builder.build();
      }
      
      @Bean(name = "targetCouchbaseCluster", destroyMethod = "disconnect")
      public Cluster targetCouchbaseCluster(ClusterEnvironment targetCouchbaseClusterEnvironment) {
        return Cluster.connect(properties.getTarget().getConnectionString(), 
                               ClusterOptions.clusterOptions(authenticator()).environment(targetCouchbaseClusterEnvironment));
      }
      
      @Bean("targetCouchbaseClientFactory")
      public CouchbaseClientFactory targetCouchbaseClientFactory(Cluster targetCouchbaseCluster) {
        return new SimpleCouchbaseClientFactory(targetCouchbaseCluster, properties.getTarget().getBucket(), properties.getTarget().getScope());
      } 
      
      @Bean("targetCouchbaseTemplate")
      public CouchbaseTemplate targetCouchbaseTemplate(CouchbaseClientFactory targetCouchbaseClientFactory) {
        return new CouchbaseTemplate(targetCouchbaseClientFactory, mappingCouchbaseConverter);
      }
    }
    

    This way, I can use my sourceRepository and my targetCouchbaseTemplate in my app :

      @Autowired
      private SourceDocumentRepository sourceRepository;
    
      @Autowired
      private CouchbaseTemplate targetCouchbaseTemplate;