Search code examples
wso2-api-managerstream-processingwso2-streaming-integrator

Wso2 Stream Processor : Error occurred while processing eventByteBufferQueue


I have two nodes of wso2-am analytics server (2.6.0) which is Wso2 Stream processors. I see following error on passive node of cluster. The active node is fine and I don't see any error. Analytics result has no impact for users who is viewing data on API Publisher or Store. however there is an error in passive node.

please advise what is causing following issue..

2019-02-26 17:06:09,513] ERROR {org.wso2.carbon.stream.processor.core.ha.tcp.EventSyncServer} - Error occurred while processing eventByteBufferQueue null java.nio.BufferUnderflowException


Solution

  • Just meet the same issue, here is my problem and solution. 1) Using the WSO2 SP HA deployment. 2) When Event come in active node and according the source mapping of the streaming, some fields are NULL 3) Active Node would like sync this event to passive node 4) passive node pick up the event data from the 'eventByteBufferQueue' to meet the standby-take over mechanism 5) passive node cannot parse the data from active node and reports error exception.

    the root cause is SP only support NULL String by default, when NULL with LONG, INTEGER.. the error occurred. but for me, Long fields have NULL is the normal case, you can change data type to string.

    here is my solution: org.wso2.carbon.stream.processor.core_2.0.478.jar Add logic to support NULL BinaryMessageConverterUtil.java for sending event data from active node

    public final class BinaryMessageConverterUtil {
    
    public static int getSize(Object data) {
        if (data instanceof String) {
            return 4 + ((String) data).length();
        } else if (data instanceof Integer) {
            return 4;
        } else if (data instanceof Long) {
            return 8;
        } else if (data instanceof Float) {
            return 4;
        } else if (data instanceof Double) {
            return 8;
        } else if (data instanceof Boolean) {
            return 1;
        } else if (data == null) {
            return 0;
        }else {
            //TODO
            return 4;
        }
    }
    
    public static EventDataMetaInfo getEventMetaInfo(Object data) {
        int eventSize;
        Attribute.Type attributeType;
        if (data instanceof String) {
            attributeType = Attribute.Type.STRING;
            eventSize = 4 + ((String) data).length();
        } else if (data instanceof Integer) {
            attributeType = Attribute.Type.INT;
            eventSize = 4;
        } else if (data instanceof Long) {
            attributeType = Attribute.Type.LONG;
            eventSize = 8;
        } else if (data instanceof Float) {
            attributeType = Attribute.Type.FLOAT;
            eventSize = 4;
        } else if (data instanceof Double) {
            attributeType = Attribute.Type.DOUBLE;
            eventSize = 8;
        } else if (data instanceof Boolean) {
            attributeType = Attribute.Type.BOOL;
            eventSize = 1;
        } else if (data == null){
            attributeType = Attribute.Type.OBJECT;
            eventSize = 0; //'no content between the HA nodes for NULL fields'
        } else {
            //TODO
            attributeType = Attribute.Type.OBJECT;
            eventSize = 1;
        }
        return new EventDataMetaInfo(eventSize, attributeType);
    }
    
    public static void assignData(Object data, ByteBuffer eventDataBuffer) throws IOException {
        if (data instanceof String) {
            eventDataBuffer.putInt(((String) data).length());
            eventDataBuffer.put((((String) data).getBytes(Charset.defaultCharset())));
        } else if (data instanceof Integer) {
            eventDataBuffer.putInt((Integer) data);
        } else if (data instanceof Long) {
            eventDataBuffer.putLong((Long) data);
        } else if (data instanceof Float) {
            eventDataBuffer.putFloat((Float) data);
        } else if (data instanceof Double) {
            eventDataBuffer.putDouble((Double) data);
        } else if (data instanceof Boolean) {
            eventDataBuffer.put((byte) (((Boolean) data) ? 1 : 0));
        } else if (data == null){
            //put nothing into he Buffer
        } else {
            eventDataBuffer.putInt(0);
        }
    }
    
    public static String getString(ByteBuf byteBuf, int size) throws UnsupportedEncodingException {
        byte[] bytes = new byte[size];
        byteBuf.readBytes(bytes);
        return new String(bytes, Charset.defaultCharset());
    }
    
    public static String getString(ByteBuffer byteBuf, int size) throws UnsupportedEncodingException {
        byte[] bytes = new byte[size];
        byteBuf.get(bytes);
        return new String(bytes, Charset.defaultCharset());
    }
    

    }

    SiddhiEventConverter.java for processing event data at passive node

    static Object[] toObjectArray(ByteBuffer byteBuffer,
                                  String[] attributeTypeOrder) throws UnsupportedEncodingException {
        if (attributeTypeOrder != null) {
            Object[] objects = new Object[attributeTypeOrder.length];
            for (int i = 0; i < attributeTypeOrder.length; i++) {
                switch (attributeTypeOrder[i]) {
                    case "INT":
                        objects[i] = byteBuffer.getInt();
                        break;
                    case "LONG":
                        objects[i] = byteBuffer.getLong();
                        break;
                    case "STRING":
                        int stringSize = byteBuffer.getInt();
                        if (stringSize == 0) {
                            objects[i] = null;
                        } else {
                            objects[i] = BinaryMessageConverterUtil.getString(byteBuffer, stringSize);
                        }
                        break;
                    case "DOUBLE":
                        objects[i] = byteBuffer.getDouble();
                        break;
                    case "FLOAT":
                        objects[i] = byteBuffer.getFloat();
                        break;
                    case "BOOL":
                        objects[i] = byteBuffer.get() == 1;
                        break;
                    case "OBJECT":
                        //for NULL fields
                        objects[i] = null;
                        break;
                    default:
                        // will not occur
                }
            }
            return objects;
        } else {
            return null;
        }
    }