Search code examples
javaapache-nifi

How to read nifi FlowFile as one string or how to read each line in Processor


I need to read each line, or full content as string froom FlowFile

I know, it is possible to read each Record value, using filed name

But i need to read totally all file content or totally full line.

We can create Record by:

    try (InputStream is = session.read(flowFile)) {
        RecordReader reader = readerFactory.createRecordReader(flowFile, is, getLogger());

        Record record;
        while ((record = reader.nextRecord()) != null) {
... etc

But in Record interface there is no methods to get only line

And in FlowFile class there is not method to read all file

So, i think i could read InputStream and make string from it. But will it work? And if yes, is it the best solution to read file or line as string in nifi

If no, please, write solution

?

P.S.

Markers from my google-chrome history i used (for better search this question): nifi read all file content, nifi read line from file


Solution

  • Thanks @daggett

    As variant we can use BufferedReader

    But as another answer, that better suit for nifi, we can use limiter by memmory partition reader, called StreamDemarcator like that

    private void process(final FlowFile flowFile, final ProcessContext context, final ProcessSession session) {
        final byte[] demarcatorBytes = context.getProperty(TEXT_DELIMITER).evaluateAttributeExpressions().getValue()
                                              .getBytes(StandardCharsets.UTF_8);
        final int maxMessageSize = context.getProperty(MAX_TEXT_SIZE).asDataSize(DataUnit.B).intValue();
        InputStream flowFileContent = session.read(flowFile);
        FlowFile outFile = session.create(flowFile);
        try {
            session.write(outFile, out -> {
                try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
                    byte[] messageContent = demarcator.nextToken();
                    while (messageContent != null) {
                        messageContent = demarcator.nextToken();
                        if (messageContent != null) {
                            out.write(demarcatorBytes);
                            //or use you logic
                        }
                    }
                }
            });
        } catch (Exception e) {
            session.remove(outFile);
            throw e;
        }
        session.transfer(outFile, REL_SUCCESS);
    }
    

    And class

    import java.io.*;
    
    import org.apache.nifi.stream.io.exception.TokenTooLargeException;
    
    /**
     * The <code>StreamDemarcator</code> class takes an input stream and demarcates
     * it so it could be read (see {@link #nextToken()}) as individual byte[]
     * demarcated by the provided delimiter (see 'delimiterBytes'). If delimiter is
     * not provided the entire stream will be read into a single token which may
     * result in {@link OutOfMemoryError} if stream is too large. The 'maxDataSize'
     * controls the maximum size of the buffer that accumulates a token.
     * <p>
     * NOTE: Not intended for multi-thread usage hence not Thread-safe.
     * </p>
     */
    public class StreamDemarcator extends AbstractDemarcator {
        private final byte[] delimiterBytes;
        private boolean skipLargeTokens = false;
    
        /**
         * Constructs a new instance
         *
         * @param is             instance of {@link InputStream} representing the data
         * @param delimiterBytes byte array representing delimiter bytes used to split the
         *                       input stream. Can be 'null'. NOTE: the 'null' is allowed only
         *                       for convenience and consistency since without delimiter this
         *                       class is no different then BufferedReader which reads the
         *                       entire stream into a byte array and there may be a more
         *                       efficient ways to do that (if that is the case).
         * @param maxDataSize    maximum size of data derived from the input stream. This means
         *                       that neither {@link InputStream} nor its individual tokens (if
         *                       delimiter is used) can ever be greater then this size.
         */
        public StreamDemarcator(InputStream is, byte[] delimiterBytes, int maxDataSize) {
            this(is, delimiterBytes, maxDataSize, INIT_BUFFER_SIZE);
        }
    
        /**
         * Constructs a new instance
         *
         * @param is                instance of {@link InputStream} representing the data
         * @param delimiterBytes    byte array representing delimiter bytes used to split the
         *                          input stream. Can be 'null'. NOTE: the 'null' is allowed only
         *                          for convenience and consistency since without delimiter this
         *                          class is no different then BufferedReader which reads the
         *                          entire stream into a byte array and there may be a more
         *                          efficient ways to do that (if that is the case).
         * @param maxDataSize       maximum size of data derived from the input stream. This means
         *                          that neither {@link InputStream} nor its individual tokens (if
         *                          delimiter is used) can ever be greater then this size.
         * @param initialBufferSize initial size of the buffer used to buffer {@link InputStream}
         *                          or its parts (if delimiter is used) to create its byte[]
         *                          representation. Must be positive integer. The buffer will grow
         *                          automatically as needed up to the Integer.MAX_VALUE;
         */
        public StreamDemarcator(InputStream is, byte[] delimiterBytes, int maxDataSize, int initialBufferSize) {
            super(is, maxDataSize, initialBufferSize);
            this.validate(delimiterBytes);
            this.delimiterBytes = delimiterBytes;
        }
    
        public StreamDemarcator skipLargeTokens(boolean value) {
            skipLargeTokens = value;
            return this;
        }
    
        /**
         * Will read the next data token from the {@link InputStream} returning null
         * when it reaches the end of the stream.
         *
         * @throws IOException if unable to read from the stream
         */
        public byte[] nextToken() throws IOException {
            byte[] token = null;
            int j = 0;
    nextTokenLoop:
            while (token == null && this.availableBytesLength != -1) {
                if (this.index >= this.availableBytesLength) {
                    this.fill();
                }
                if (this.availableBytesLength != -1) {
                    byte byteVal;
                    int i;
                    for (i = this.index; i < this.availableBytesLength; i++) {
                        byteVal = this.buffer[i];
    
                        boolean delimiterFound = false;
                        if (this.delimiterBytes != null && this.delimiterBytes[j] == byteVal) {
                            if (++j == this.delimiterBytes.length) {
                                delimiterFound = true;
                            }
                        } else {
                            j = 0;
                        }
    
                        if (delimiterFound) {
                            this.index = i + 1;
                            int size = this.index - this.mark - this.delimiterBytes.length;
                            try {
                                token = this.extractDataToken(size);
                            } catch (TokenTooLargeException e) {
                                if (!skipLargeTokens) {
                                    throw e;
                                } else {
                                    token = null;
                                }
                            }
                            this.mark = this.index;
                            j = 0;
                            if (token != null) {
                                break nextTokenLoop;
                            }
                        }
                    }
                    this.index = i;
                } else {
                    try {
                        token = this.extractDataToken(this.index - this.mark);
                    } catch (TokenTooLargeException e) {
                        if (!skipLargeTokens) {
                            throw e;
                        }
                    }
                }
            }
            return token;
        }
    
        /**
         * Validates prerequisites for constructor arguments
         */
        private void validate(byte[] delimiterBytes) {
            if (delimiterBytes != null && delimiterBytes.length == 0) {
                throw new IllegalArgumentException("'delimiterBytes' is an optional argument, but when provided its length must be > 0");
            }
        }
    }
    

    And parent - AbstractDemarcator

    import java.io.*;
    import java.nio.BufferOverflowException;
    
    import org.apache.nifi.stream.io.exception.TokenTooLargeException;
    
    /**
     * Base class for implementing streaming demarcators.
     * <p>
     * NOTE: Not intended for multi-thread usage hence not Thread-safe.
     * </p>
     */
    abstract class AbstractDemarcator implements Closeable {
    
        final static int INIT_BUFFER_SIZE = 8192;
    
        private final InputStream is;
    
        /*
         * The size of the initial buffer. Its value is also used when bufer needs
         * to be expanded.
         */
        private final int initialBufferSize;
    
        /*
         * The maximum allowed size of the token. In the event such size is exceeded
         * TokenTooLargeException is thrown.
         */
        private final int maxDataSize;
    
        /*
         * Buffer into which the bytes are read from the provided stream. The size
         * of the buffer is defined by the 'initialBufferSize' provided in the
         * constructor or defaults to the value of INIT_BUFFER_SIZE constant.
         */
        byte[] buffer;
    
        /*
         * Starting offset of the demarcated token within the current 'buffer'.
         */
        int index;
    
        /*
         * Starting offset of the demarcated token within the current 'buffer'. Keep
         * in mind that while most of the time it is the same as the 'index' it may
         * also have a value of 0 at which point it serves as a signal to the fill()
         * operation that buffer needs to be expended if end of token is not reached
         * (see fill() operation for more details).
         */
        int mark;
    
        /*
         * Starting offset (from the beginning of the stream) of the demarcated
         * token.
         */
        long offset;
    
        /*
         * The length of the bytes valid for reading. It is different from the
         * buffer length, since this number may be smaller (e.g., at he end of the
         * stream) then actual buffer length. It is set by the fill() operation
         * every time more bytes read into buffer.
         */
        int availableBytesLength;
    
        /**
         * Constructs an instance of demarcator with provided {@link InputStream}
         * and max buffer size. Each demarcated token must fit within max buffer
         * size, otherwise the exception will be raised.
         */
        AbstractDemarcator(InputStream is, int maxDataSize) {
            this(is, maxDataSize, INIT_BUFFER_SIZE);
        }
    
        /**
         * Constructs an instance of demarcator with provided {@link InputStream}
         * and max buffer size and initial buffer size. Each demarcated token must
         * fit within max buffer size, otherwise the exception will be raised.
         */
        AbstractDemarcator(InputStream is, int maxDataSize, int initialBufferSize) {
            this.validate(is, maxDataSize, initialBufferSize);
            this.is = is;
            this.initialBufferSize = initialBufferSize;
            this.buffer = new byte[initialBufferSize];
            this.maxDataSize = maxDataSize;
        }
    
        @Override
        public void close() throws IOException {
            this.is.close();
        }
    
        /**
         * Will fill the current buffer from current 'index' position, expanding it
         * and or shuffling it if necessary. If buffer exceeds max buffer size a
         * {@link TokenTooLargeException} will be thrown.
         *
         * @throws IOException if unable to read from the stream
         */
        void fill() throws IOException {
            if (this.index >= this.buffer.length) {
                if (this.mark == 0) { // expand
                    long expandedSize = this.buffer.length + this.initialBufferSize;
                    if (expandedSize > Integer.MAX_VALUE) {
                        throw new BufferOverflowException(); // will probably OOM before this will ever happen, but just in case.
                    }
                    byte[] newBuff = new byte[(int) expandedSize];
                    System.arraycopy(this.buffer, 0, newBuff, 0, this.buffer.length);
                    this.buffer = newBuff;
                } else { // shuffle
                    int length = this.index - this.mark;
                    System.arraycopy(this.buffer, this.mark, this.buffer, 0, length);
                    this.index = length;
                    this.mark = 0;
                }
            }
    
            int bytesRead;
            /*
             * The do/while pattern is used here similar to the way it is used in
             * BufferedReader essentially protecting from assuming the EOS until it
             * actually is since not every implementation of InputStream guarantees
             * that bytes are always available while the stream is open.
             */
            do {
                bytesRead = this.is.read(this.buffer, this.index, this.buffer.length - this.index);
            } while (bytesRead == 0);
            this.availableBytesLength = bytesRead != -1 ? this.index + bytesRead : -1;
        }
    
        /**
         * Will extract data token of the provided length from the current buffer
         * starting at the 'mark'.
         */
        byte[] extractDataToken(int length) throws IOException {
            if (length > this.maxDataSize) {
                throw new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + this.maxDataSize + " bytes.");
            }
            byte[] data = null;
            if (length > 0) {
                data = new byte[length];
                System.arraycopy(this.buffer, this.mark, data, 0, data.length);
            }
            return data;
        }
    
        /**
         * Validates prerequisites for constructor arguments
         */
        private void validate(InputStream is, int maxDataSize, int initialBufferSize) {
            if (is == null) {
                throw new IllegalArgumentException("'is' must not be null");
            } else if (maxDataSize <= 0) {
                throw new IllegalArgumentException("'maxDataSize' must be > 0");
            } else if (initialBufferSize <= 0) {
                throw new IllegalArgumentException("'initialBufferSize' must be > 0");
            }
        }
    }