everyone! I got some idea to use gRPC protobuff code generation implementation as a data layer API to use it instead of POJO in RSocket protocol.
Here is the implementation:
syntax = "proto3";
import "google/protobuf/wrappers.proto";
option java_package = "me.some.protoapi";
option java_multiple_files = true;
message ValidationTaskRequest {
int64 id = 1;
string name = 2;
}
message ValidationTaskResponse {
int64 id = 1;
ValidationStatus status = 2;
ValidationError error = 3;
}
message ValidationError {
string reason = 1;
}
enum ValidationStatus {
PASSED = 0;
DECLINED = 1;
}
RSocket configuration
@Configuration
public class RSocketConfiguration {
@Bean
public RSocket rSocket(@Value("${rsocket.client.port}") int port) {
return RSocketFactory
.connect()
.mimeType(MimeTypeUtils.ALL_VALUE, MimeTypeUtils.ALL_VALUE)
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(TcpClientTransport.create(port))
.start()
.retry()
.block();
}
@Bean
public RSocketRequester rSocketRequester(RSocket rSocket, RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(
rSocket,
MimeTypeUtils.ALL,
MimeTypeUtils.ALL,
rSocketStrategies
);
}
}
and the service itself
@Service
public class ValidationServiceImpl implements ValidationService {
private final Logger logger = LoggerFactory.getLogger(ValidationServiceImpl.class);
private final TaskService taskService;
private final ReactiveRedisTemplate<String, Task> redis;
private final RSocketRequester rSocketRequester;
private final RedisTopicHelper redisHelper;
@Value("${rsocket.routes.validation}")
private String rSocketValidationRoute;
@Value("${validation.interval}")
private Optional<Integer> validationInterval;
public ValidationServiceImpl(TaskService taskService, ReactiveRedisTemplate<String, Task> redis, RSocketRequester rSocketRequester, RedisTopicHelper redisHelper) {
this.taskService = taskService;
this.redis = redis;
this.rSocketRequester = rSocketRequester;
this.redisHelper = redisHelper;
}
@Override
public void startValidationProcess() {
logger.info("validation listener started");
Flux.interval(Duration.ofMillis(validationInterval.orElse(1000)))
.flatMap(i -> redis.keys(redisHelper.topicAllKeys()))
.flatMap(redis.opsForValue()::get)
.filter(it -> !it.isVerified())
.flatMap(this::requestValidation)
.log()
.metrics()
.subscribe(result -> {
if ( result.getError() != null ) {
logger.error(result.getError().getReason());
} else {
taskService.markTaskAsVerified(result.getId(), result.getStatus());
redis.opsForValue().delete(redisHelper.specifiedTopicWithId(result.getId()));
}
});
}
private Mono<ValidationTaskResponse> requestValidation(Task task) {
return rSocketRequester
.route(rSocketValidationRoute)
.data(
ValidationTaskRequest
.newBuilder()
.setId(task.getId())
.setName(task.getName())
)
.retrieveMono(ValidationTaskResponse.class);
}
}
But, when the spring boot service is starting I caught exception
java.lang.IllegalArgumentException: No decoder for me.some.protoapi.ValidationTaskResponse
at org.springframework.messaging.rsocket.RSocketStrategies.decoder(RSocketStrategies.java:92) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.retrieveMono(DefaultRSocketRequester.java:274) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at org.springframework.messaging.rsocket.DefaultRSocketRequester$DefaultRequestSpec.retrieveMono(DefaultRSocketRequester.java:258) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
at me.some.test.api.services.validation.ValidationServiceImpl.requestValidation(ValidationServiceImpl.java:73) ~[main/:na]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:378) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:107) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:530) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:972) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:355) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:107) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.2.RELEASE.jar:3.3.2.RELEASE]
at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onNext(RedisPublisher.java:888) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at io.lettuce.core.RedisPublisher$RedisSubscription.onNext(RedisPublisher.java:281) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at io.lettuce.core.RedisPublisher$SubscriptionCommand.complete(RedisPublisher.java:756) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:59) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at io.lettuce.core.protocol.CommandHandler.complete(CommandHandler.java:654) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:614) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:565) ~[lettuce-core-5.2.1.RELEASE.jar:5.2.1.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.45.Final.jar:4.1.45.Final]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
As you can see, the problem is in decoding proto-models in RSocket transport layer. I know, that gRPC its a binary serializing and it has some magic with binary description below. Has anyone tried to bing this two technologies? Any ideas would be very helpful. Thanks.
I have never faced this problem, but I've configurated my receiver and requester for an JSON payload. Could you try to set an octet stream mime type for example?
@Bean
RSocket rSocket() {
return RSocketFactory
.connect()
.metadataMimeType("message/x.rsocket.composite-metadata.v0")
.frameDecoder(PayloadDecoder.ZERO_COPY)
.dataMimeType(MimeTypeUtils.APPLICATION_JSON_VALUE)
.transport(TcpClientTransport.create(ztlServerHostname, port))
.start()
.block();
}
@Bean
RSocketRequester rSocketRequester(RSocket rSocket, RSocketStrategies rSocketStrategies) {
return RSocketRequester.wrap(rSocket, MimeTypeUtils.APPLICATION_JSON,
MimeTypeUtils.parseMimeType("message/x.rsocket.composite-metadata.v0"),
rSocketStrategies);
}
The full code is here