Search code examples
javasocketsapache-camelmina

Sending to Apache Camel Mina2 using Socket


I am trying to use Apache Camel Mina as my socket server to receive byte streams. I am using Apache Camel 2.12.1 and this is my simple route:

<route id="retriever">
  <from uri="mina2:tcp://127.0.0.1:5555?sync=false" />
  <convertBodyTo type="java.lang.String" />
  <to uri="file:temp/out" />
</route>

I can perfectly start the route and send data using telnet. My problem comes when I am using a simple Java test client to send data:

byte[] myData = {0x34, 0x12, 0x25, 0x34};
Socket socket = new Socket("127.0.0.1", 5555);
OutputStream os = socket.getOutputStream();
os.write(myData, 0, myData.length);
os.flush();
socket.close(); 

When using this client I am not getting any exceptions anywhere but the data does not enter in the camel route. I have been trying to implement my own codec and I check that MINA is receiving the data, but I am not sure if for this simple case I need a special codec. I just want to retrieve the byte array and save it.

So my question is: what I am doing wrong? Why doesn't the default mina2 codec work for my scenario? Am I missing any special option in the mina endpoint to allow this?

Thanks!


Solution

  • If you mean TextLineCodecFactory then you should check the source code. The decoder of this codec uses a delimiter (new line of the OS the the framework is running).

    TextLineCodecFactory , TextLineDecoder , LineDelimiter

    Check the decoder. In particular check this part

    226     private void decodeAuto(Context ctx, IoSession session, IoBuffer in, ProtocolDecoderOutput out)
    227             throws CharacterCodingException, ProtocolDecoderException {
    228         int matchCount = ctx.getMatchCount();
    229 
    230         // Try to find a match
    231         int oldPos = in.position();
    232         int oldLimit = in.limit();
    233 
    234         while (in.hasRemaining()) {
    235             byte b = in.get();
    236             boolean matched = false;
    237 
    238             switch (b) {
    239             case '\r':
    240                 // Might be Mac, but we don't auto-detect Mac EOL
    241                 // to avoid confusion.
    242                 matchCount++;
    243                 break;
    244 
    245             case '\n':
    246                 // UNIX
    247                 matchCount++;
    248                 matched = true;
    249                 break;
    250 
    251             default:
    252                 matchCount = 0;
    253             }
    254 
    255             if (matched) {
    256                 // Found a match.
    257                 int pos = in.position();
    258                 in.limit(pos);
    259                 in.position(oldPos);
    260 
    261                 ctx.append(in);
    262 
    263                 in.limit(oldLimit);
    264                 in.position(pos);
    265 
    266                 if (ctx.getOverflowPosition() == 0) {
    267                     IoBuffer buf = ctx.getBuffer();
    268                     buf.flip();
    269                     buf.limit(buf.limit() - matchCount);
    270 
    271                     try {
    272                         byte[] data = new byte[buf.limit()];
    273                         buf.get(data);
    274                         CharsetDecoder decoder = ctx.getDecoder();
    275 
    276                         CharBuffer buffer = decoder.decode(ByteBuffer.wrap(data));
    277                         String str = new String(buffer.array());
    278                         writeText(session, str, out);
    279                     } finally {
    280                         buf.clear();
    281                     }
    282                 } else {
    283                     int overflowPosition = ctx.getOverflowPosition();
    284                     ctx.reset();
    285                     throw new RecoverableProtocolDecoderException("Line is too long: " + overflowPosition);
    286                 }
    287 
    288                 oldPos = pos;
    289                 matchCount = 0;
    290             }
    291         }
    292 
    293         // Put remainder to buf.
    294         in.position(oldPos);
    295         ctx.append(in);
    296 
    297         ctx.setMatchCount(matchCount);
    298     }
    

    and this part

    180     public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
    181         Context ctx = getContext(session);
    182 
    183         if (LineDelimiter.AUTO.equals(delimiter)) {
    184             decodeAuto(ctx, session, in, out);
    185         } else {
    186             decodeNormal(ctx, session, in, out);
    187         }
    188     }
    

    When it's filling the IoBuffer it uses a new line as the delimiter, so if you don't add one then it will continue to wait for it. Haven't tested this but I'm confident that that is the problem. Just try to send a string with a new line in the end. Convert it into bytes and see what happens.

    If you want to transfer data you will have to use some kind of protocol that will set the the rules that the transmitter and the receiver will use in order to set and get the end of communication.