Search code examples
c++compressionboost-iostreams

Switching between compressed and uncompressed data


I wrote a simple filter for Boost iostreams that detects if the file is compressed or simple text and would delegate to the gzip_decompressor if it is.

The problem is that I seek back in my input stream to feed the data again inside the decompressor. Only some streams don't support this and break with a violent thread exception.

Instead, I thought, OK, let's use a basic_array_source to feed the two characters, but this source doesn't support the read call!

So this works all the time:

struct gz_decompressor {
    typedef char                                          char_type;
    typedef boost::iostreams::multichar_input_filter_tag  category;

    boost::iostreams::gzip_decompressor m_decompressor{15, backtest::GzReader::GZIP_BUFFER_SIZE};
    bool m_initialized{false};
    bool m_is_compressed{false};

    template<typename Source>
    std::streamsize read(Source& src, char* s, std::streamsize n) {
        if (!m_initialized) {
            init(src, s, n);
        }

        if (m_is_compressed) {
            return m_decompressor.read(src, s, n);
        }

        return boost::iostreams::read(src, s, n);
    }
};

The part that I can't figure out is:

    template<typename Source>
    void init(Source& src, char* s, std::streamsize n) {
        char header[2];
        header[0] = boost::iostreams::get(src);
        header[1] = boost::iostreams::get(src);
        m_is_compressed = header[0] == static_cast<char>(0x1f) && header[2] == static_cast<char>(0x8b);
        m_initialized = true;

        boost::iostreams::basic_array_source<char> source(header);

        if (m_is_compressed) {
            m_decompressor.read(source, s, n); // Nope, is not allowed!
        }
        else {
            boost::iostreams::read(source, s, n);
        }
    }

Any clue about how to do this properly, i.e. without seeking back?


Solution

  • I have an imperfect solution, by reusing code that gzip_decompressor uses (the peekable_source):

    using namespace boost::iostreams;
    
    template<typename Source>
    struct PeekableSource {
        typedef char char_type;
        struct category : source_tag, peekable_tag { };
        explicit PeekableSource(Source& src, const std::string& putback = "")
                : src_(src), putback_(putback), offset_(0)
        { }
        std::streamsize read(char* s, std::streamsize n)
        {
            std::streamsize result = 0;
    
            // Copy characters from putback buffer
            std::streamsize pbsize =
                    static_cast<std::streamsize>(putback_.size());
            if (offset_ < pbsize) {
                result = (std::min)(n, pbsize - offset_);
                BOOST_IOSTREAMS_CHAR_TRAITS(char)::copy(
                        s, putback_.data() + offset_, result);
                offset_ += result;
                if (result == n)
                    return result;
            }
    
            // Read characters from src_
            std::streamsize amt =
                    boost::iostreams::read(src_, s + result, n - result);
            return amt != -1 ?
                   result + amt :
                   result ? result : -1;
        }
        bool putback(char c)
        {
            if (offset_) {
                putback_[--offset_] = c;
            } else {
                boost::throw_exception(
                        boost::iostreams::detail::bad_putback());
            }
            return true;
        }
        void putback(const std::string& s)
        {
            putback_.replace(0, offset_, s);
            offset_ = 0;
        }
    
        // Returns true if some characters have been putback but not re-read.
        bool has_unconsumed_input() const
        {
            return offset_ < static_cast<std::streamsize>(putback_.size());
        }
    
        // Returns the sequence of characters that have been put back but not re-read.
        std::string unconsumed_input() const
        {
            return std::string(putback_, offset_, putback_.size() - offset_);
        }
        Source&          src_;
        std::string      putback_;
        std::streamsize  offset_;
    };
    
    struct gzDecompressor {
        typedef char              char_type;
        typedef multichar_input_filter_tag  category;
    
        gzip_decompressor m_decompressor;
        bool m_initialized{false};
        bool m_is_compressed{false};
        std::string m_putback;
    
        template<typename Source>
        void init(Source& src) {
            std::string data;
            data.push_back(get(src));
            data.push_back(get(src));
            m_is_compressed = data[0] == static_cast<char>(0x1f) && data[1] == static_cast<char>(0x8b);
            src.putback(data);
            m_initialized = true;
        }
    
        template<typename Source>
        std::streamsize read(Source& src, char* s, std::streamsize n) {
            PeekableSource<Source> peek(src, m_putback);
            if (!m_initialized) {
                init(peek);
            }
    
            if (m_is_compressed) {
                return m_decompressor.read(peek, s, n);
            }
    
            return boost::iostreams::read(peek, s, n);
        }
    };
    

    It's not that great because there are now two intermediate sources that can cache data, but at least the bulk of the work should be done through the read interface and not byte by byte, so this should alleviate any performance concern.