Search code examples
spring-bootspring-integrationzeromqmessagingspring-messaging

ZeroMQ with Spring (spring-integration-zeromq)


I'm using spring-integration-zeromq and I'm trying to set up with authentication settings.

    @Bean
    ZeroMqChannel zeroMqPubSubChannel(ZContext context, ObjectMapper objectMapper) {
        ZeroMqChannel channel = new ZeroMqChannel(context, true);
        channel.setConnectUrl("tcp://localhost:6001:6002");
        channel.setConsumeDelay(Duration.ofMillis(100));
        channel.setMessageConverter(new GenericMessageConverter());
        channel.setSendSocketConfigurer(socket -> {
            socket.setZAPDomain("global".getBytes());
            socket.setCurveServer(true);
            socket.setCurvePublicKey("my_public_key".getBytes());
            socket.setCurveSecretKey("my_secret_key".getBytes());
        });
        EmbeddedJsonHeadersMessageMapper mapper = new EmbeddedJsonHeadersMessageMapper(objectMapper);
        channel.setMessageMapper(mapper);
        channel.afterPropertiesSet();

        channel.subscribe(m -> System.out.println(m));
        return channel;
    }

However it seems like results of setSendSocketConfigurer are ignored. In the org.springframework.integration.zeromq.channel.ZeroMqChannel sendSocketConnectionConfigurer is inited as an empty lambda and passed as such into prepareSendSocketMono; so me calling setSendSocketConfigurer consequently seem to have no effect as it only replaces a property in ZeroMqChannel instance, but not being applied to an already created by then socket mono. How do I set up authentication properly? Am I missing something?


UPD1

After the fix provided by Artem Bilan, the socket configurers seem to have started being applied to the channel, but communication stopped working. I have applied recommendations and tried out setting up ZeroMqProxy in hope it'll provide me with a workaround, but still no success: even my logging subscription in the same config is not coming through the authentication (though it works if I remove the socket configurers calls)

@Configuration
public class ZeroMQConfig {
    @Bean
    ZeroMqProxy zeroMqProxy(ZContext context, @Value("${zmq.channel.port.frontend}") int frontendPort,
                            @Value("${zmq.channel.port.backend}") int backendPort) {
        ZeroMqProxy proxy = new ZeroMqProxy(context, ZeroMqProxy.Type.SUB_PUB);
        proxy.setExposeCaptureSocket(true);
        proxy.setFrontendPort(frontendPort);
        proxy.setBackendPort(backendPort);
        ZCert cert = new ZCert();
        proxy.setFrontendSocketConfigurer(socket -> {
            socket.setCurvePublicKey(cert.getPublicKey());
            socket.setCurveSecretKey(cert.getSecretKey());
            socket.setCurveServerKey(Z85.decode("my_server_public_key"));
        });
        proxy.setBackendSocketConfigurer(socket -> {
            socket.setCurvePublicKey(cert.getPublicKey());
            socket.setCurveSecretKey(cert.getSecretKey());
            socket.setCurveServerKey(Z85.decode("my_server_public_key"));
        });
        return proxy;
    }

    @Bean
    public ZContext zContext() {
        ZContext context = new ZContext();
        ZAuth auth = new ZAuth(context);
        auth.configureCurve(ZAuth.CURVE_ALLOW_ANY);
        auth.setVerbose(true);
        return context;
    }

    @Bean(name = "zeroMqPubChannel")
    ZeroMqChannel zeroMqPubChannel(ZContext context, ObjectMapper objectMapper, ZeroMqProxy proxy){
        ZeroMqChannel channel = new ZeroMqChannel(context, true);
        channel.setZeroMqProxy(proxy);
        channel.setConsumeDelay(Duration.ofMillis(100));
        channel.setMessageConverter(new GenericMessageConverter());
        EmbeddedJsonHeadersMessageMapper mapper = new EmbeddedJsonHeadersMessageMapper(objectMapper);
        channel.setMessageMapper(mapper);
        return channel;
    }

    @Bean
    @ServiceActivator(inputChannel = "zeroMqPubChannel")
    public MessageHandler subscribe() {
        return message -> System.out.println(message);
    }
}

Solution

  • Yeah... I see you point. This a bug: we must postpone a his.sendSocketConfigurer usage until really an interaction happens with a socket. I'll fix that soon enough.

    For now on a couple remarks for your config:

    You must not call an afterPropertiesSet() yourself. Let the Spring application context to manage its callbacks for you!

    You must not subscribe into the MessageChannel in its bean definition. Instead consider to have a @ServiceActivator(inputChannel = "zeroMqPubSubChannel"). See more info in docs: https://docs.spring.io/spring-integration/reference/html/messaging-endpoints.html#service-activator

    Unfortunately there is no way to pass that customization into an internal ZMQ.Socket instance...

    UPDATE

    The working test with Curve auth in ZeroMQ:

    @Test
    void testPubSubWithCurve() throws InterruptedException {
        ZContext CONTEXT = new ZContext();
        new ZAuth(CONTEXT).configureCurve(ZAuth.CURVE_ALLOW_ANY).setVerbose(true);
    
        ZMQ.Curve.KeyPair frontendKeyPair = ZMQ.Curve.generateKeyPair();
        ZMQ.Curve.KeyPair backendKeyPair = ZMQ.Curve.generateKeyPair();
    
        ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
        proxy.setBeanName("subPubCurveProxy");
        proxy.setFrontendSocketConfigurer(socket -> {
            socket.setZAPDomain("global".getBytes());
            socket.setCurveServer(true);
            socket.setCurvePublicKey(frontendKeyPair.publicKey.getBytes());
            socket.setCurveSecretKey(frontendKeyPair.secretKey.getBytes());
        });
        proxy.setBackendSocketConfigurer(socket -> {
            socket.setZAPDomain("global".getBytes());
            socket.setCurveServer(true);
            socket.setCurvePublicKey(backendKeyPair.publicKey.getBytes());
            socket.setCurveSecretKey(backendKeyPair.secretKey.getBytes());
        });
        proxy.afterPropertiesSet();
        proxy.start();
    
        ZeroMqChannel channel = new ZeroMqChannel(CONTEXT, true);
        channel.setZeroMqProxy(proxy);
        channel.setBeanName("testChannelWithCurve");
        channel.setSendSocketConfigurer(socket -> {
            ZCert clientCert = new ZCert();
            socket.setCurvePublicKey(clientCert.getPublicKey());
            socket.setCurveSecretKey(clientCert.getSecretKey());
            socket.setCurveServerKey(frontendKeyPair.publicKey.getBytes());
        });
        channel.setSubscribeSocketConfigurer(socket -> {
                    ZCert clientCert = new ZCert();
                    socket.setCurvePublicKey(clientCert.getPublicKey());
                    socket.setCurveSecretKey(clientCert.getSecretKey());
                    socket.setCurveServerKey(backendKeyPair.publicKey.getBytes());
                }
        );
        channel.setConsumeDelay(Duration.ofMillis(10));
        channel.afterPropertiesSet();
    
        BlockingQueue<Message<?>> received = new LinkedBlockingQueue<>();
    
        channel.subscribe(received::offer);
        channel.subscribe(received::offer);
    
        await().until(() -> proxy.getBackendPort() > 0);
    
        // Give it some time to connect and subscribe
        Thread.sleep(1000);
    
        GenericMessage<String> testMessage = new GenericMessage<>("test1");
        assertThat(channel.send(testMessage)).isTrue();
    
        Message<?> message = received.poll(10, TimeUnit.SECONDS);
        assertThat(message).isNotNull().isEqualTo(testMessage);
        message = received.poll(10, TimeUnit.SECONDS);
        assertThat(message).isNotNull().isEqualTo(testMessage);
    
        channel.destroy();
        proxy.stop();
        CONTEXT.close();
    }