Search code examples
javarestasynchronousspring-bootspring-web

ConcurrentModificationException with MockRestService when restTemplate used via async executors


Trying to mock out rest responses with spring's MockRestService for integration tests, but AbstractRequestExpectationManager keeps running into ConcurrentModificationException when the actual code uses rest template asynchronously.

Test pseudocode snippets:

@Autowired
RestTemplate restTemplate;
MockRestServiceServer mockRestServiceServer;

@Test
public test() {
    // given
    mockRestServiceServer = MockRestServiceServer
            .bindTo( restTemplate )
            .ignoreExpectOrder()  // supported from spring 4.3
            .build();
    prepareRestResponse( "/resource/url", "mock json content".getBytes() );
    // when
    myservice.refreshPricesForProductGroup( 2 );

    // then
    // assertions
}

private void prepareRestResponse( final String urlTail, final byte[] responseContent ) {
    mockRestServiceServer
            .expect( requestTo( endsWith( urlTail ) ) )
            .andExpect( method( HttpMethod.GET ) )
            .andRespond( withSuccess()
                    .body( responseContent )
                    .contentType( APPLICATION_JSON_UTF8 ) );
}

Actual code accessing the rest template:

@Autowired
Executor executor
@Autowired
PriceRestClient priceClient
@Autowired
ProductRestClient productClient

/../

private void refreshPricesForProductGroup( final int groupId ) {

    List<Product> products = productClient.findAllProductsForGroup( groupId );

    products.forEach( p ->
            executor.execute( () -> {
                final Price price = priceClient.getPrice( p.getId() );
                priceRepository.updatePrice( price );
            } )
    );
}

The PriceRestClient.getPrice() performs simple rest call:

Price getPrice( String productId ) {

    try {
        ResponseEntity<byte[]> entity = restTemplate.exchange(
                restUtil.getProductPriceDataUrl(),
                HttpMethod.GET,
                restUtil.createGzipEncodingRequestEntity(),
                byte[].class,
                productId );

        if ( entity.getStatusCode() == HttpStatus.OK ) {
            String body = restUtil.unmarshalGzipBody( entity.getBody() );
            return priceEntityParser.parse( body );
        }

    } catch ( HttpClientErrorException e ) {
        // TODO
    } catch ( ResourceAccessException e ) {
        // TODO
    } catch ( IOException e ) {
        // TODO
    }

    return null;
}

thrown exception:

Exception in thread "AsyncExecutor-2" java.util.ConcurrentModificationException
    at java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:711)
    at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:734)
    at org.springframework.test.web.client.AbstractRequestExpectationManager$RequestExpectationGroup.findExpectation(AbstractRequestExpectationManager.java:167)
    at org.springframework.test.web.client.UnorderedRequestExpectationManager.validateRequestInternal(UnorderedRequestExpectationManager.java:42)
    at org.springframework.test.web.client.AbstractRequestExpectationManager.validateRequest(AbstractRequestExpectationManager.java:71)
    at org.springframework.test.web.client.MockRestServiceServer$MockClientHttpRequestFactory$1.executeInternal(MockRestServiceServer.java:286)
    at org.springframework.mock.http.client.MockClientHttpRequest.execute(MockClientHttpRequest.java:93)
    at org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:93)
    at com.mycompany.myproduct.web.client.HttpRequestInterceptorLoggingClient.interceptReq(HttpRequestInterceptorLoggingClient.java:32)
    at org.springframework.http.client.InterceptingClientHttpRequest$InterceptingRequestExecution.execute(InterceptingClientHttpRequest.java:85)
    at org.springframework.http.client.InterceptingClientHttpRequest.executeInternal(InterceptingClientHttpRequest.java:69)
    at org.springframework.http.client.AbstractBufferingClientHttpRequest.executeInternal(AbstractBufferingClientHttpRequest.java:48)
    at org.springframework.http.client.AbstractClientHttpRequest.execute(AbstractClientHttpRequest.java:53)
    at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:596)
    at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:557)
    at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:475)
    at com.mycompany.myproduct.rest.PriceRestClient.getPrice(PriceRestClient.java:48)
    at com.mycompany.myproduct.service.ProductPriceSourcingService.lambda$null$29(ProductPriceSourcingService.java:132)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Am I doing something wrong here, or it might be a bug with MockRestService?


Solution

  • "Fixed" this by creating a copy of UnorderedRequestExpectationManager:

    package cucumber.testbeans;
    
    import java.io.IOException;
    import java.util.Collection;
    import java.util.Collections;
    import java.util.LinkedHashSet;
    import java.util.Set;
    
    import org.springframework.http.client.ClientHttpRequest;
    import org.springframework.http.client.ClientHttpResponse;
    import org.springframework.test.web.client.*;
    
    /**
    * Essentially an {@link UnorderedRequestExpectationManager}, but overrides some implementations of {@link AbstractRequestExpectationManager}
    * in order to allow adding more expectations after the first request has been made.
    */
    public class RestRequestExpectationManager extends AbstractRequestExpectationManager {
    
        private final RequestExpectationGroup remainingExpectations = new RequestExpectationGroup();
    
        @Override
        public ResponseActions expectRequest( ExpectedCount count, RequestMatcher matcher ) {
            //        Assert.state(getRequests().isEmpty(), "Cannot add more expectations after actual requests are made.");
            RequestExpectation expectation = new DefaultRequestExpectation( count, matcher );
            getExpectations().add( expectation );
            return expectation;
        }
    
        @Override
        public ClientHttpResponse validateRequest( ClientHttpRequest request ) throws IOException {
            //        if (getRequests().isEmpty()) {
            afterExpectationsDeclared();
            //        }
            ClientHttpResponse response = validateRequestInternal( request );
            getRequests().add( request );
            return response;
        }
    
        @Override
        protected void afterExpectationsDeclared() {
            this.remainingExpectations.updateAll( getExpectations() );
        }
    
        @Override
        public ClientHttpResponse validateRequestInternal( ClientHttpRequest request ) throws IOException {
            RequestExpectation expectation = this.remainingExpectations.findExpectation( request );
            if ( expectation != null ) {
                ClientHttpResponse response = expectation.createResponse( request );
                this.remainingExpectations.update( expectation );
                return response;
            }
            throw createUnexpectedRequestError( request );
        }
    
        /**
        * Same as {@link AbstractRequestExpectationManager.RequestExpectationGroup}, but synchronizes operations on the {@code expectations}
        * set, so async operation would be possible.
        */
        private static class RequestExpectationGroup {
    
            private final Set<RequestExpectation> expectations = Collections.synchronizedSet( new LinkedHashSet<>() );
    
            public Set<RequestExpectation> getExpectations() {
                return this.expectations;
            }
    
            public void update( RequestExpectation expectation ) {
                if ( expectation.hasRemainingCount() ) {
                    getExpectations().add( expectation );
                } else {
                    getExpectations().remove( expectation );
                }
            }
    
            public void updateAll( Collection<RequestExpectation> expectations ) {
                for ( RequestExpectation expectation : expectations ) {
                    update( expectation );
                }
            }
    
            public RequestExpectation findExpectation( ClientHttpRequest request ) throws IOException {
                synchronized ( this.expectations ) {
                    for ( RequestExpectation expectation : getExpectations() ) {
                        try {
                            expectation.match( request );
                            return expectation;
                        } catch ( AssertionError error ) {
                            // Ignore
                        }
                    }
                    return null;
                }
            }
        }
    }
    

    There are 2 notable things going on:

    • first the commented lines in RestRequestExpectationManager, that makes us able to add expectations after the first request has been processed (not related to the ConcurrentModificationException issue at hand);
    • then the expectations synchronization in RestRequestExpectationManager.RequestExpectationGroup in order to support async operations. Appears to be working for me.

    Initialise the MockRestServiceServer as follows:

    MockRestServiceServer mockRestServiceServer = MockRestServiceServer
                .bindTo( restTemplate )
                .build( new RestRequestExpectationManager() );