Search code examples
javaredislettuce

Redis / Lettuce / Stream - how to send my own entry id in xadd


I'm implementing a price feed with Java as feeder and python as consumer. I'm more python than Java. I need to send my own entry id (timestamp) from java, but I don't find examples and clear documentation (I'm an entry level Java) on how to do it.

This is the java code which is working now, but sending the default Redis entry id, equivalent to send : XADD stream_id * key_1 value_1 key_2 value_2.

public void onTick(Instrument instrument, ITick tick) throws JFException {

        String _streamName = instrument.name() + ":tick";
        long _tickDate = tick.getTime();
        String _sTickDate = Long.toString(_tickDate);

        double _ask = tick.getAsk();
        double _bid = tick.getBid();

        Map<String, String> _messageBody = new HashMap<>();
        _messageBody.put( "ask", Double.toString(_ask) );
        _messageBody.put( "bid", Double.toString(_bid) );

        console.getOut().println("Sending to " + _streamName + " -> " + tick);

        this.sync.xadd(_streamName, _messageBody);    
    }

In python is very simple to do it with redis-py.

def send_tick(self, instrument, bar_size, tick):
        _stream_id = f"{instrument}:{bar_size}"
        self.client.xadd(_stream_id, {"bid": tick[self.BID_KEY], "ask": tick[self.ASK_KEY]}, id=tick[self.TS_KEY])

It is sent in the id= named prameter.

I would appreciate any help

Thanks


Solution

  • After taking some fresh air, found the answer, based on the description of XAddArgs found here.

    https://www.codota.com/code/java/classes/io.lettuce.core.XAddArgs

    import io.lettuce.core.RedisClient;
    import io.lettuce.core.api.StatefulRedisConnection;
    import io.lettuce.core.api.sync.RedisCommands;
    import io.lettuce.core.XAddArgs;
    
    .
    .
    .
        public static XAddArgs id(long tsId) {
            return new XAddArgs().id(Long.toString(tsId));
        }
    
        @Override
        public void onTick(Instrument instrument, ITick tick) throws JFException {
    
            String _streamName = instrument.name() + ":tick";
            long _tickDate = tick.getTime();
            String _sTickDate = Long.toString(_tickDate);
    
            double _ask = tick.getAsk();
            double _bid = tick.getBid();
    
            Map<String, String> _messageBody = new HashMap<>();
            _messageBody.put( "ask", Double.toString(_ask) );
            _messageBody.put( "bid", Double.toString(_bid) );
            _messageBody.put( "time", _sTickDate );
    
            console.getOut().println("Sending to " + _streamName + " -> " + tick);
    
            this.sync.xadd(_streamName, id(_tickDate),  _messageBody);    
        }