Search code examples
javaspringspring-integrationspring-integration-ip

Got a java.lang.IllegalArgumentException when sending a Java object by the dynamic TCP/IP integration flow?


Gary Russell helped me some time ago with the following 'DynamicTcpServer' flow (see Building a TCP/IP server with SI's dynamic flow registration) having now a message service injected which gets the message to send as soon as a client connects:

public class DynamicTcpServer implements TcpServer {

    @Autowired
    private IntegrationFlowContext flowContext;

    @Autowired
    private ApplicationContext appContext;

    private final Map<String, IntegrationFlowRegistration> registrations = new HashMap<>();

    private final Map<String, String> clients = new ConcurrentHashMap<>();

    private final Map<String, TcpServerSpec> sockets;

    private final MessageService messenger;

    @Autowired
    public DynamicTcpServer(MessageService messenger, Map<String, TcpServerSpec> sockets) {
        this.messenger = messenger;
        this.sockets = sockets;
    }

    @Override
    public void start(String context) {
        start(context, sockets.get(context).getPort());
    }

    @Override
    public void start(String context, int port) {
        if (this.registrations.containsKey(context)) {
            /* already running */
        }
        else {
            TcpServerConnectionFactorySpec server = Tcp.netServer(port).id(context).serializer(TcpCodecs.lf());
            server.get().registerListener(msg -> false); // dummy listener so the accept thread doesn't exit
            IntegrationFlow flow = f -> f.handle(Tcp.outboundAdapter(server));
            this.registrations.put(context, flowContext.registration(flow).register());
        }
    }

    @Override
    public Set<String> running() {
        return registrations.keySet();
    }

    @Override
    public void stop(String context) {
        IntegrationFlowRegistration registration = this.registrations.remove(context);
        if (registration != null) {
            registration.destroy();
        }
    }

    @EventListener
    public void connect(TcpConnectionOpenEvent event) {
        String connectionId = event.getConnectionId();
        this.clients.put(connectionId, event.getConnectionFactoryName());
    }

    @EventListener
    public void closed(TcpConnectionCloseEvent event) {
        this.clients.remove(event.getConnectionId());
    }

    @EventListener
    public void listening(TcpConnectionServerListeningEvent event) {
    }

    @Scheduled(
            fixedDelayString = "${com.harry.potter.scheduler.fixed-delay}",
            initialDelayString = "${com.harry.potter.scheduler.initial-delay}"
    )
    public void sender() {
        this.clients.forEach((connectId, context) -> {
            IntegrationFlowRegistration register = registrations.get(context);
            if (register != null) {
                try {
                    while (true) {
                        List<ServerMessage> msgs = messenger.getMessagesToSend(sockets.get(context));
                        msgs.stream().forEach(msg -> 
                                register.getMessagingTemplate().send(
                                        MessageBuilder.withPayload(msg)
                                                .setHeader(IpHeaders.CONNECTION_ID, connectId).build()));       
                    }
                }
                catch (NoMessageToSendException nm) {
                    appContext.getBean(context, TcpNetServerConnectionFactory.class)
                            .closeConnection(connectId);    
                }
            }
        });
    }
}

The message service returns a Java object 'com.harry.potter.entity.ServerMessage' to be sent. So I assume I have to add some other kind of converter at '.serializer(TcpCodecs.lf())' because I got an exception saying:

2022-04-17 04:00:45.729 DEBUG [] --- [pool-283-thread-1]  c.l.c.c.cas.service.DynamicTcpServer     : sender: send 1 messages to potter1
2022-04-17 04:00:45.738 DEBUG [] --- [pool-283-thread-1]  c.l.c.c.c.service.DynamicTcpServer     : closed event=TcpConnectionCloseEvent [source=TcpNetConnection:harry.potter.de:56746:17584:76adefe0-0881-4e4b-be2b-0ced47f950ae], [factory=potter1, connectionId=harry.potter.de:56746:17584:76adefe0-0881-4e4b-be2b-0ced47f950ae] **CLOSED**
2022-04-17 04:00:45.740 ERROR [] --- [pool-283-thread-1]  o.s.i.ip.tcp.TcpSendingMessageHandler    : Error sending message

org.springframework.messaging.MessagingException: Send Failed; nested exception is java.lang.IllegalArgumentException: When using a byte array serializer, the socket mapper expects either a byte array or String payload, but received: class com.harry.potter.entity.ServerMessage
    at org.springframework.integration.ip.tcp.connection.TcpNetConnection.send(TcpNetConnection.java:118)
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageAsServer(TcpSendingMessageHandler.java:119)
    at org.springframework.integration.ip.tcp.TcpSendingMessageHandler.handleMessageInternal(TcpSendingMessageHandler.java:103)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:99)
    at com.harry.potter.service.DynamicTcpServer.lambda$sender$2(DynamicTcpServer.java:125)
    at com.harry.potter.service.DynamicTcpServer$$Lambda$40600/0x000000006f511b08.accept(Unknown Source)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1625)
    at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
    at com.harry.potter.service.DynamicTcpServer.lambda$sender$3(DynamicTcpServer.java:124)
    at com.harry.potter.service.DynamicTcpServer$$Lambda$40552/0x000000003344f3b0.accept(Unknown Source)
    at java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603)
    at com.harry.potter.service.DynamicTcpServer.sender(DynamicTcpServer.java:115)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:884)
Caused by: java.lang.IllegalArgumentException: When using a byte array serializer, the socket mapper expects either a byte array or String payload, but received: class com.harry.potter.entity.ServerMessage
    at org.springframework.integration.ip.tcp.connection.TcpMessageMapper.getPayloadAsBytes(TcpMessageMapper.java:277)
    at org.springframework.integration.ip.tcp.connection.TcpMessageMapper.fromMessage(TcpMessageMapper.java:252)
    at org.springframework.integration.ip.tcp.connection.TcpNetConnection.send(TcpNetConnection.java:111)
    ... 34 common frames omitted

Which converter (serializer) do I have to use and how to plug it in my DynamicTcpServer exactly?

EDIT 1

The message service messenger returns a Java object 'com.harry.potter.entity.ServerMessage' to be sent. The ServerMessage contains an int field holding the message length and a String field holding the message text:

public class ServerMessage implements Serializable {
    private static final long serialVersionUID = -1L;

    private int len;
    private String message;

    /* getters & setters */
}

I am trying to migrate from a C/C++ function which writes the C Struct

struct C_MSG
{
    int len;                        /* Length field */
    char text[MAX_MSG_LEN];         /* Data field   */
} c_msg;

to a consumer using the C Socket library send function writing a given number of bytes (length of text + 4) from the given memory address to the given TCP/IP socket.

I am looking for a Transformer to prepare the same binary content for the message consumer. Otherwise the consumer will not be able to cope with the message.

Following the comments and looking at the GenericTransformer<S, T> the transformation could be done in a single Lambda expression. The source of the transformation would be an object of the ServerMessage? The result should be an array of Bytes using Spring's utility:

.transform(s -> SerializationUtils.serialize(s))

Will the Lambda expression be that one? Perhaps do I need a custom Transformer - to have more control over the serializing process in case my consumer expects Intel resp. Motorola byte order - with implementing a specific interface? Which one? Perhaps there is a much easier solution?


Solution

  • You need to think about .transform() before that .handle(Tcp.outboundAdapter(server)); to convert your ServerMessage to byte[] or String. That's what is expected in the TcpMessageMapper by default.

    Of course I could recommend you to look into the mapper(TcpMessageMapper mapper) option of the Tcp.netServer() and its bytesMessageMapper property, but the outcome would be just the same.