Search code examples
spring-cloud-streamspring-data-cassandraspring-cloud-contractcassandraunit

Weirdness ensues when running integraton tests with Spring Cloud Stream, Spring Data Cassandra and Cassandra-Unit-Spring


I'm using Cassandra as an Event Store for my app, with Spring Cloud Stream receiving the events and Spring Cloud Cassandra saving them.

In order to listen to different events, I've created a ClientEvent with two user defined types: client and product (both classes are annotated with @UserDefinedType), as such:

@Table
public class ClientEvent {
    @PrimaryKey
    @CassandraType(type = DataType.Name.UUID)
    private String id; // created with UUIDs.timeBased().toString()
    @CassandraType(type = DataType.Name.UDT, userTypeName = "client")
    private Client client; // has an email field
    @CassandraType(type = DataType.Name.UDT, userTypeName = "product")
    private Product product; // has name and cost fields
}

My EventConsumer looks like this, and works fine for different events:

@EnableBinding(Sink.class)
public class EventConsumer {

    @Inject
    private EventRepository eventRepository;

    @Override
    @StreamListener(Sink.INPUT)
    public void clientEvent(ClientEvent event) {
        eventRepository.save(event);
    }
}

I also have a controller that returns all events for a given client (code for said controller is very standard, so I won't post it here).

In order to test all this, I've set up some integration tests with spring-cloud-contract-stub-runner and cassandra-unit-spring to run an embedded cassandra. I have only the following configuration, in my application.yml:

spring:
  profiles: test
  data:
    cassandra:
      keyspace-name: testkeyspace
      contact-points: localhost
      port: 9142

Here's the Spock test:

@SpringBootTest(classes = EventsApplication.class, webEnvironment = RANDOM_PORT)
@AutoConfigureStubRunner(ids = ["example.com:clients-service",
                         "example.com:products-service"], stubsMode = StubsMode.LOCAL)
@TestExecutionListeners(listeners = [
    CassandraUnitDependencyInjectionTestExecutionListener.class,
    DependencyInjectionTestExecutionListener.class
])
@CassandraDataSet(keyspace = "testkeyspace", value = "dataset.cql")
@EmbeddedCassandra(timeout = 60000L)
@ActiveProfiles("test")
class EventsIntegrationTests extends Specification {
    @Inject
    StubTrigger stubTrigger

    @Inject
    private TestRestTemplate restTemplate

    def "client created"() {
        given: "that a client has been created"
        stubTrigger.trigger("client_created") //stub defined in clients-service

        when: "I fetch the events for the current client"
        def response = restTemplate.getForEntity("/", ClientEvent[])

        then: "it should be a clientCreated event"
        ClientEvent event = response.body[0]
        event.name == "clientCreated"
        event.client == new Client("test") // hashCode and equals implemented
    }

    def "product bought"() {
        given: "that a product has been bought"
        stubTrigger.trigger("product_bought") //stub defined in products-service
    }

        when: "I fetch the events for the current client"
        def response = restTemplate.getForEntity("/", ClientEvent[])

        then: "it should be a productBought event"
        ClientEvent event = response.body[0]
        event.name == "productBought"
        event.client == new Client("test") // hashCode and equals implemented
        event.product == new Product("Lamp", 100D)
    }
}

the dataset.cql file creates the testkeyspace, the client and product types and then the ClientEvent table. So far so good. The problem is that, while the first test runs fine, the second test gives me this NullPointerException when saving the event in the EventConsumer:

org.springframework.messaging.MessagingException: Exception thrown while invoking com.example.events.consumers.EventConsumer#clientEvent[1 args]; nested exception is java.lang.NullPointerException
[...]
Caused by: java.lang.NullPointerException: null
at org.springframework.data.cassandra.core.mapping.SimpleUserTypeResolver.resolveType(SimpleUserTypeResolver.java:63) ~[spring-data-cassandra-2.0.6.RELEASE.jar:2.0.6.RELEASE]

I've found out if I debug or wait one second between each test (with a Thread.sleep(1000) in a setup() method), my tests run fine! I've noticed that between each test, cassandra-unit-spring drops and recreates the database. It seems to me that the second test happens before the user defined types are recreated, which is why it throws a NullPointerException:

2018-05-31 22:43:43.552  INFO 14976 --- [port-Requests-1] o.a.cassandra.service.MigrationManager   : Drop Keyspace 'testkeyspace'
2018-05-31 22:43:53.445  INFO 14976 --- [igrationStage:1] o.a.cassandra.utils.memory.BufferPool    : Global buffer pool is enabled, when pool is exhausted (max is 512.000MiB) it will allocate on heap
2018-05-31 22:43:53.511  INFO 14976 --- [port-Requests-3] o.a.cassandra.service.MigrationManager   : Create new Keyspace: KeyspaceMetadata{name=testkeyspace, params=KeyspaceParams{durable_writes=false, replication=ReplicationParams{class=org.apache.cassandra.locator.SimpleStrategy, replication_factor=1}}, tables=[], views=[], functions=[], types=[]}
2018-05-31 22:43:53.603  INFO 14976 --- [port-Requests-2] o.a.cassandra.service.MigrationManager   : Create new table: org.apache.cassandra.config.CFMetaData@1f5471ca[cfId=3d9ed530-653d-11e8-b838-27af117b0453,ksName=testkeyspace,cfName=clientevent,flags=[COMPOUND],params=TableParams{comment=, read_repair_chance=0.0, dclocal_read_repair_chance=0.1, bloom_filter_fp_chance=0.01, crc_check_chance=1.0, gc_grace_seconds=864000, default_time_to_live=0, memtable_flush_period_in_ms=0, min_index_interval=128, max_index_interval=2048, speculative_retry=99PERCENTILE, caching={'keys' : 'ALL', 'rows_per_partition' : 'NONE'}, compaction=CompactionParams{class=org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy, options={min_threshold=4, max_threshold=32}}, compression=org.apache.cassandra.schema.CompressionParams@57a6e29, extensions={}, cdc=false},comparator=comparator(),partitionColumns=[[] | [client createdat name product]],partitionKeyColumns=[id],clusteringColumns=[],keyValidator=org.apache.cassandra.db.marshal.TimeUUIDType,columnMetadata=[product, id, client, name, createdat],droppedColumns={},triggers=[],indexes=[]]
2018-05-31 22:43:53.619  INFO 14976 --- [igrationStage:1] o.apache.cassandra.db.ColumnFamilyStore  : Initializing testkeyspace.clientevent

Am I missing something in my configuration? How do I avoid having to wait a second between tests?


Solution

  • Ok, I made some changes following this post on CassandraUnit, which not only fixed my problem but made my tests run much faster, since it only creates the database once!

    I've made the following changes on my EventsIntegrationTest:

    @SpringBootTest(classes = EventsApplication.class, webEnvironment = RANDOM_PORT)
    @AutoConfigureStubRunner(ids = ["example.com:clients-service",
                         "example.com:products-service"], stubsMode = StubsMode.LOCAL)
    // no more TestExecutionListeners and cassandra annotations
    @ActiveProfiles("test")
    class EventsIntegrationTests extends Specification {
    
        @Shared
        private static Cluster cluster
    
        @Shared
        private static Session session
    
        @Inject
        StubTrigger stubTrigger
    
        @Inject
        private TestRestTemplate restTemplate
    
        // runs only once before all tests, like JUnit 5's @BeforeAll
        def setupSpec() {
            // the same code from the post, creates the DB.
            if (session == null) {
                try {
                    EmbeddedCassandraServerHelper.startEmbeddedCassandra()
    
                    cluster = new Cluster.Builder().addContactPoint("localhost").withPort(9142).build()
                    session = cluster.connect()
                    CQLDataLoader loader = new CQLDataLoader(session)
                    ClassPathCQLDataSet dataSet = new ClassPathCQLDataSet("dataset.cql", true, true, "testkeyspace")
                    loader.load(dataSet)
                } catch (Exception e) {
                    throw new RuntimeException("Could not start cassandra server or obtain a valid session.", e);
                }
            }
        }
    
        // same tests
    
        // runs after every test like JUnit 5's @BeforeEach
        def cleanup() {
            // truncates the tables
            Collection<TableMetadata> tables = cluster.getMetadata().getKeyspace("testkeyspace").getTables()
            // Groovy's clojure, slightly different syntax from Java's Lambda
            tables.forEach({session.execute(QueryBuilder.truncate(it))})
        }
    }
    

    Of course, since I only have one Integration Test, all the code is in that class. But, if you have more than one integration test involving Cassandra, I'd recommend creating a base class with these methods.