Search code examples
javacachingnetflix-zuul

Netflix Zuul Pre Filter for Cache is not working for a smal amount of compressed responses


I'd like to use zuul to cache some requests. The Cache is stored in a Redis as a POJO and contains plaintext (not gzip compressed data).

For normal tests and integration tests, everything works pretty well. With a jmeter load test, some of the requests fails with

java.util.zip.ZipException: Not in GZIP format (from jmeter)

We figure out, that at this point, zuul is returning an empty response.

My PreFilter:

public class CachePreFilter extends CacheBaseFilter {

    private static DynamicIntProperty INITIAL_STREAM_BUFFER_SIZE = DynamicPropertyFactory.getInstance().getIntProperty(ZuulConstants.ZUUL_INITIAL_STREAM_BUFFER_SIZE, 8192);

    @Autowired
    CounterService counterService;

    public CachePreFilter(RedisCacheManager redisCacheManager, Properties properties) {
        super(redisCacheManager, properties);
    }

    @Override
    public Object run() {
        RequestContext ctx = RequestContext.getCurrentContext();

        CachedResponse data = getFromCache(ctx);
        if (null != data) {

            counterService.increment("counter.cached");

            HttpServletResponse response = ctx.getResponse();

            response.addHeader("X-Cache", "HIT");

            if (null != data.getContentType()) {
                response.setContentType(data.getContentType());
            }

            if (null != data.getHeaders()) {
                for (Entry<String, String> header : data.getHeaders().entrySet()) {
                    if (!response.containsHeader(header.getKey())) {
                        response.addHeader(header.getKey(), header.getValue());
                    }
                }
            }

            OutputStream outStream = null;
            try {
                outStream = response.getOutputStream();

                boolean isGzipRequested = ctx.isGzipRequested();

                if (null != data.getBody()) {
                    final String requestEncoding = ctx.getRequest().getHeader(ZuulHeaders.ACCEPT_ENCODING);

                    if (requestEncoding != null && HTTPRequestUtils.getInstance().isGzipped(requestEncoding)) {
                        isGzipRequested = true;
                    }

                    ByteArrayOutputStream byteArrayOutputStream = null;
                    ByteArrayInputStream is = null;
                    try {

                        if (isGzipRequested) {
                            byteArrayOutputStream = new ByteArrayOutputStream();
                            GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
                            gzipOutputStream.write(data.getBody().getBytes(StandardCharsets.UTF_8));
                            gzipOutputStream.flush();
                            gzipOutputStream.close();

                            ctx.setResponseGZipped(true);

                            is = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
                            logger.debug(String.format("Send gzip content %s", data.getBody()));

                            response.setHeader(ZuulHeaders.CONTENT_ENCODING, "gzip");
                        } else {
                            logger.debug(String.format("Send content %s", data.getBody()));

                            is = new ByteArrayInputStream(data.getBody().getBytes(StandardCharsets.UTF_8));
                        }
                        writeResponse(is, outStream);

                    } catch (Exception e) {
                        logger.error("Error at sending response " + e.getMessage(), e);
                        throw new RuntimeException("Failed to send content", e);
                    } finally {
                        if (null != byteArrayOutputStream) {
                            byteArrayOutputStream.close();
                        }

                        if (null != is) {
                            is.close();
                        }
                    }
                }

                ctx.setSendZuulResponse(false);
            } catch (IOException e) {
                logger.error("Cannot read from Stream " + e.getMessage(), e.getMessage());
            } finally {
                // don't close the outputstream
            }

            ctx.set(CACHE_HIT, true);

            return data;
        } else {
            counterService.increment("counter.notcached");
        }

        ctx.set(CACHE_HIT, false);
        return null;
    }

    private ThreadLocal<byte[]> buffers = new ThreadLocal<byte[]>() {
        @Override
        protected byte[] initialValue() {
            return new byte[INITIAL_STREAM_BUFFER_SIZE.get()];
        }
    };

    private void writeResponse(InputStream zin, OutputStream out) throws Exception {
        byte[] bytes = buffers.get();
        int bytesRead = -1;
        while ((bytesRead = zin.read(bytes)) != -1) {
            out.write(bytes, 0, bytesRead);
        }
    }

    @Override
    public int filterOrder() {
        return 99;
    }

    @Override
    public String filterType() {
        return "pre";
    }
}

My Post Filter

public class CachePostFilter extends CacheBaseFilter {

    public CachePostFilter(RedisCacheManager redisCacheManager, Properties properties) {
        super(redisCacheManager, properties);
    }

    @Override
    public boolean shouldFilter() {
        RequestContext ctx = RequestContext.getCurrentContext();
        return super.shouldFilter() && !ctx.getBoolean(CACHE_HIT);
    }

    @Override
    public Object run() {
        RequestContext ctx = RequestContext.getCurrentContext();
        HttpServletRequest req = ctx.getRequest();
        HttpServletResponse res = ctx.getResponse();

        if (isSuccess(res, ctx.getOriginResponseHeaders())) {
            // Store only successful responses

            String cacheKey = cacheKey(req);
            if (cacheKey != null) {
                String body = null;

                if (null != ctx.getResponseBody()) {
                    body = ctx.getResponseBody();
                } else if (null != ctx.getResponseDataStream()) {
                    InputStream is = null;
                    try {
                        is = ctx.getResponseDataStream();

                        final Long len = ctx.getOriginContentLength();
                        if (len == null || len > 0) {

                            if (ctx.getResponseGZipped()) {
                                is = new GZIPInputStream(is);
                            }

                            StringWriter writer = new StringWriter();

                            IOUtils.copy(is, writer, "UTF-8");
                            body = writer.toString();

                            if (null != body && !body.isEmpty()) {
                                ctx.setResponseDataStream(new ByteArrayInputStream(body.getBytes()));
                                ctx.setResponseGZipped(false);
                                ctx.setOriginContentLength(String.valueOf(body.getBytes().length));
                            } else {
                                ctx.setResponseBody("{}");
                            }
                        }

                    } catch (IOException e) {
                        logger.error("Cannot read body " + e.getMessage(), e);
                    } finally {
                        if (null != is) {
                            try {
                                is.close();
                            } catch (IOException e) {

                            }
                        }
                    }

                    saveToCache(ctx, cacheKey, body);
                }
            }
        }
        return null;

    }

    @Override
    public int filterOrder() {
        return 1;
    }

    @Override
    public String filterType() {
        return "post";
    }

    private boolean isSuccess(HttpServletResponse res, List<Pair<String, String>> originHeaders) {
        if (res != null && res.getStatus() < 300) {
            if (null != originHeaders) {
                for (Pair<String, String> header : originHeaders) {
                    if (header.first().equals("X-CACHEABLE") && header.second().equals("1")) {
                        return true;
                    }
                }
            }
        }
        return false;
    }

We test it without Redis (just store it into a local variable) and this is still the same. We logged always the response from cache (before gzip) and everything looks good.


Solution

  • (Posted on behalf of the question author).

    Solution

    We refactor our PostFilter and don't change so much in the Response for zuul. After this change, we don't see any problems any more:

    Working Post Filter

    public class CachePostFilter extends CacheBaseFilter {
    
        public CachePostFilter(RedisCacheManager redisCacheManager, Properties properties) {
            super(redisCacheManager, properties);
        }
    
        @Override
        public boolean shouldFilter() {
            RequestContext ctx = RequestContext.getCurrentContext();
            return super.shouldFilter() && !ctx.getBoolean(CACHE_HIT);
        }
    
        @Override
        public Object run() {
            RequestContext ctx = RequestContext.getCurrentContext();
            HttpServletRequest req = ctx.getRequest();
            HttpServletResponse res = ctx.getResponse();
    
            if (isSuccess(res, ctx.getOriginResponseHeaders())) {
                // Store only successful responses
    
                String cacheKey = cacheKey(req);
                if (cacheKey != null) {
                    String body = null;
    
                    if (null != ctx.getResponseBody()) {
                        body = ctx.getResponseBody();
                    } else if (null != ctx.getResponseDataStream()) {
                        InputStream rawInputStream = null;
                        InputStream gzipByteArrayInputStream = null;
                        try {
                            rawInputStream = ctx.getResponseDataStream();
                            gzipByteArrayInputStream = null;
                            // If origin tell it's GZipped but the content is ZERO
                            // bytes,
                            // don't try to uncompress
                            final Long len = ctx.getOriginContentLength();
                            if (len == null || len > 0) {
    
                                byte[] rawData = IOUtils.toByteArray(rawInputStream);
    
                                ctx.setResponseDataStream(new ByteArrayInputStream(rawData));
    
                                if (ctx.getResponseGZipped()) {
                                    gzipByteArrayInputStream = new GZIPInputStream(new ByteArrayInputStream(rawData));
                                } else {
                                    gzipByteArrayInputStream = new ByteArrayInputStream(rawData);
                                }
    
                                StringWriter writer = new StringWriter();
    
                                IOUtils.copy(gzipByteArrayInputStream, writer, "UTF-8");
                                body = writer.toString();
                            }
    
                        } catch (IOException e) {
                            logger.error("Cannot read body " + e.getMessage(), e);
                        } finally {
                            if (null != rawInputStream) {
                                try {
                                    rawInputStream.close();
                                } catch (IOException e) {
    
                                }
                            }
    
                            if (null != gzipByteArrayInputStream) {
                                try {
                                    gzipByteArrayInputStream.close();
                                } catch (IOException e) {
    
                                }
                            }
                        }
    
                        // if we read from the stream, the other filter cannot read
                        // and they dont' deliver any response
                        // ctx.setResponseBody(body);
                        // ctx.setResponseGZipped(false);
    
                        saveToCache(ctx, cacheKey, body);
                    }
                }
            }
            return null;
    
        }
    
        @Override
        public int filterOrder() {
            return 1;
        }
    
        @Override
        public String filterType() {
            return "post";
        }
    
        private boolean isSuccess(HttpServletResponse res, List<Pair<String, String>> originHeaders) {
            if (res != null && res.getStatus() == 200) {
                if (null != originHeaders) {
                    for (Pair<String, String> header : originHeaders) {
                        if (header.first().equals("X-CACHEABLE") && header.second().equals("1")) {
                            return true;
                        }
                    }
                }
            }
            return false;
        }
    }