Search code examples
spring-bootkotlincouchbase

"Value is never used as Publisher"


I am currently working on a Spring Boot project. I have connected to a Couchbase, and want to upsert a document to the it. In my repository layer I make use of the upsert() method. Following is my repository layer:

import com.couchbase.client.java.ReactiveCluster
import com.couchbase.client.java.ReactiveCollection
import com.trendyol.productstockapi.entity.ProductStock
import org.springframework.stereotype.Repository

@Repository
class ProductStockRepository (
    private val cluster: ReactiveCluster,
    private val productStockCollection: ReactiveCollection
){

    fun upsertProductStock(productStock: ProductStock){
        val result = productStockCollection.upsert(
            productStock.stockId,
            productStock
        )
    }

    fun deleteProductStock(productStockId: String) {
        val result = productStockCollection.remove(productStockId)
    }

}

The following is the Couchbase configurations:

import com.couchbase.client.core.cnc.tracing.NoopRequestTracer
import com.couchbase.client.core.env.CompressionConfig
import com.couchbase.client.core.env.IoEnvironment
import com.couchbase.client.core.env.OrphanReporterConfig
import com.couchbase.client.core.env.TimeoutConfig
import com.couchbase.client.java.ClusterOptions
import com.couchbase.client.java.ReactiveCluster
import com.couchbase.client.java.ReactiveCollection
import com.couchbase.client.java.codec.JacksonJsonSerializer
import com.couchbase.client.java.env.ClusterEnvironment
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.time.Duration

@Configuration
@EnableConfigurationProperties(CouchbaseConfigurationProperties::class)
class CouchbaseConfiguration(
    private val couchbaseConfigurationProperties: CouchbaseConfigurationProperties,
    private val objectMapper: ObjectMapper?,
) {

    @Bean
    fun clusterEnvironment(): ClusterEnvironment {
        return ClusterEnvironment
            .builder()
            .jsonSerializer(JacksonJsonSerializer.create(objectMapper))
            .ioEnvironment(IoEnvironment.builder().eventLoopThreadCount(Runtime.getRuntime().availableProcessors()))
            .compressionConfig(CompressionConfig.builder().enable(true))
            .requestTracer(NoopRequestTracer.INSTANCE)
            .orphanReporterConfig(OrphanReporterConfig.builder().emitInterval(Duration.ofSeconds(60)))
            .timeoutConfig(
                TimeoutConfig.builder()
                    .kvTimeout(couchbaseConfigurationProperties.connection.kvTimeout)
                    .connectTimeout(couchbaseConfigurationProperties.connection.connectTimeout)
                    .queryTimeout(couchbaseConfigurationProperties.connection.queryTimeout)
            )
            .build()
    }

    @Bean
    fun cluster(clusterEnvironment: ClusterEnvironment): ReactiveCluster {
        val clusterOptions = ClusterOptions
            .clusterOptions(couchbaseConfigurationProperties.secrets.cbUsername, couchbaseConfigurationProperties.secrets.cbPassword)
            .environment(clusterEnvironment)

        return ReactiveCluster.connect(couchbaseConfigurationProperties.hosts.joinToString(","), clusterOptions)
    }

    @Bean
    fun productStockCollection(cluster: ReactiveCluster): ReactiveCollection {
        return cluster.bucket(couchbaseConfigurationProperties.productContentBucket).collection("stock")

    }


}

The problem is when I hover on the upsert() or remove() methods, I get a warning stating

Value is never used as Publisher

My Couchbase version is com.couchbase.client:java-client:3.2.4

I haven't been able to come up with any solution.


Solution

  • ReactiveCollection.upsert() returns a cold Mono. You'll need to subscribe to the Mono before anything happens. IntelliJ is warning you that you're not subscribing to the Mono (which is a type of Publisher).

    The simplest way to get your code working is to call Mono.block(), which subscribes to the Mono, and blocks the current thread until the Mono emits a value:

    fun upsertProductStock(productStock: ProductStock){
        val result = productStockCollection.upsert(
            productStock.stockId,
            productStock
        ).block()
    }
    

    However, blocking like this is not efficient. (If you're willing to block the current thread, you might as well use the Couchbase SDK's blocking API instead of its reactive API.) Since you're using Kotlin, you can turn upsertProductStock into a suspend function, and suspend instead of blocking while the Mono does its work.

    For this trick, you'll need to add kotlinx-coroutines-reactive as a dependency of your project.

    <dependency>
        <groupId>org.jetbrains.kotlinx</groupId>
        <artifactId>kotlinx-coroutines-reactive</artifactId>
        <version>${kotlin.coroutines.version}</version>
    </dependency>
    

    Then you can write:

    suspend fun upsertProductStock(productStock: ProductStock) {
        productStockCollection.upsert(
            productStock.stockId,
            productStock
        ).awaitSingle()
    }
    

    Finally, unless you need to use the Couchbase Java SDK for some reason (like maybe you're using Spring Data Couchbase and want to share the same connection), consider using the Couchbase Kotlin SDK instead of the Java SDK. The functions of the Couchbase Kotlin SDK are naturally suspend functions. With the Kotlin SDK, you would write:

    suspend fun upsertProductStock(productStock: ProductStock) {
        // In the Kotlin SDK, `upsert` is a suspend function
        productStockCollection.upsert(
            productStock.stockId,
            productStock
        )
    }