I'm using Cassandra as an Event Store for my app, with Spring Cloud Stream receiving the events and Spring Cloud Cassandra saving them.
In order to listen to different events, I've created a ClientEvent with two user defined types: client and product (both classes are annotated with @UserDefinedType
), as such:
@Table
public class ClientEvent {
@PrimaryKey
@CassandraType(type = DataType.Name.UUID)
private String id; // created with UUIDs.timeBased().toString()
@CassandraType(type = DataType.Name.UDT, userTypeName = "client")
private Client client; // has an email field
@CassandraType(type = DataType.Name.UDT, userTypeName = "product")
private Product product; // has name and cost fields
}
My EventConsumer looks like this, and works fine for different events:
@EnableBinding(Sink.class)
public class EventConsumer {
@Inject
private EventRepository eventRepository;
@Override
@StreamListener(Sink.INPUT)
public void clientEvent(ClientEvent event) {
eventRepository.save(event);
}
}
I also have a controller that returns all events for a given client (code for said controller is very standard, so I won't post it here).
In order to test all this, I've set up some integration tests with spring-cloud-contract-stub-runner and cassandra-unit-spring to run an embedded cassandra. I have only the following configuration, in my application.yml:
spring:
profiles: test
data:
cassandra:
keyspace-name: testkeyspace
contact-points: localhost
port: 9142
Here's the Spock test:
@SpringBootTest(classes = EventsApplication.class, webEnvironment = RANDOM_PORT)
@AutoConfigureStubRunner(ids = ["example.com:clients-service",
"example.com:products-service"], stubsMode = StubsMode.LOCAL)
@TestExecutionListeners(listeners = [
CassandraUnitDependencyInjectionTestExecutionListener.class,
DependencyInjectionTestExecutionListener.class
])
@CassandraDataSet(keyspace = "testkeyspace", value = "dataset.cql")
@EmbeddedCassandra(timeout = 60000L)
@ActiveProfiles("test")
class EventsIntegrationTests extends Specification {
@Inject
StubTrigger stubTrigger
@Inject
private TestRestTemplate restTemplate
def "client created"() {
given: "that a client has been created"
stubTrigger.trigger("client_created") //stub defined in clients-service
when: "I fetch the events for the current client"
def response = restTemplate.getForEntity("/", ClientEvent[])
then: "it should be a clientCreated event"
ClientEvent event = response.body[0]
event.name == "clientCreated"
event.client == new Client("test") // hashCode and equals implemented
}
def "product bought"() {
given: "that a product has been bought"
stubTrigger.trigger("product_bought") //stub defined in products-service
}
when: "I fetch the events for the current client"
def response = restTemplate.getForEntity("/", ClientEvent[])
then: "it should be a productBought event"
ClientEvent event = response.body[0]
event.name == "productBought"
event.client == new Client("test") // hashCode and equals implemented
event.product == new Product("Lamp", 100D)
}
}
the dataset.cql file creates the testkeyspace, the client and product types and then the ClientEvent table. So far so good. The problem is that, while the first test runs fine, the second test gives me this NullPointerException when saving the event in the EventConsumer:
org.springframework.messaging.MessagingException: Exception thrown while invoking com.example.events.consumers.EventConsumer#clientEvent[1 args]; nested exception is java.lang.NullPointerException
[...]
Caused by: java.lang.NullPointerException: null
at org.springframework.data.cassandra.core.mapping.SimpleUserTypeResolver.resolveType(SimpleUserTypeResolver.java:63) ~[spring-data-cassandra-2.0.6.RELEASE.jar:2.0.6.RELEASE]
I've found out if I debug or wait one second between each test (with a Thread.sleep(1000)
in a setup()
method), my tests run fine! I've noticed that between each test, cassandra-unit-spring drops and recreates the database. It seems to me that the second test happens before the user defined types are recreated, which is why it throws a NullPointerException:
2018-05-31 22:43:43.552 INFO 14976 --- [port-Requests-1] o.a.cassandra.service.MigrationManager : Drop Keyspace 'testkeyspace'
2018-05-31 22:43:53.445 INFO 14976 --- [igrationStage:1] o.a.cassandra.utils.memory.BufferPool : Global buffer pool is enabled, when pool is exhausted (max is 512.000MiB) it will allocate on heap
2018-05-31 22:43:53.511 INFO 14976 --- [port-Requests-3] o.a.cassandra.service.MigrationManager : Create new Keyspace: KeyspaceMetadata{name=testkeyspace, params=KeyspaceParams{durable_writes=false, replication=ReplicationParams{class=org.apache.cassandra.locator.SimpleStrategy, replication_factor=1}}, tables=[], views=[], functions=[], types=[]}
2018-05-31 22:43:53.603 INFO 14976 --- [port-Requests-2] o.a.cassandra.service.MigrationManager : Create new table: org.apache.cassandra.config.CFMetaData@1f5471ca[cfId=3d9ed530-653d-11e8-b838-27af117b0453,ksName=testkeyspace,cfName=clientevent,flags=[COMPOUND],params=TableParams{comment=, read_repair_chance=0.0, dclocal_read_repair_chance=0.1, bloom_filter_fp_chance=0.01, crc_check_chance=1.0, gc_grace_seconds=864000, default_time_to_live=0, memtable_flush_period_in_ms=0, min_index_interval=128, max_index_interval=2048, speculative_retry=99PERCENTILE, caching={'keys' : 'ALL', 'rows_per_partition' : 'NONE'}, compaction=CompactionParams{class=org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy, options={min_threshold=4, max_threshold=32}}, compression=org.apache.cassandra.schema.CompressionParams@57a6e29, extensions={}, cdc=false},comparator=comparator(),partitionColumns=[[] | [client createdat name product]],partitionKeyColumns=[id],clusteringColumns=[],keyValidator=org.apache.cassandra.db.marshal.TimeUUIDType,columnMetadata=[product, id, client, name, createdat],droppedColumns={},triggers=[],indexes=[]]
2018-05-31 22:43:53.619 INFO 14976 --- [igrationStage:1] o.apache.cassandra.db.ColumnFamilyStore : Initializing testkeyspace.clientevent
Am I missing something in my configuration? How do I avoid having to wait a second between tests?
Ok, I made some changes following this post on CassandraUnit, which not only fixed my problem but made my tests run much faster, since it only creates the database once!
I've made the following changes on my EventsIntegrationTest:
@SpringBootTest(classes = EventsApplication.class, webEnvironment = RANDOM_PORT)
@AutoConfigureStubRunner(ids = ["example.com:clients-service",
"example.com:products-service"], stubsMode = StubsMode.LOCAL)
// no more TestExecutionListeners and cassandra annotations
@ActiveProfiles("test")
class EventsIntegrationTests extends Specification {
@Shared
private static Cluster cluster
@Shared
private static Session session
@Inject
StubTrigger stubTrigger
@Inject
private TestRestTemplate restTemplate
// runs only once before all tests, like JUnit 5's @BeforeAll
def setupSpec() {
// the same code from the post, creates the DB.
if (session == null) {
try {
EmbeddedCassandraServerHelper.startEmbeddedCassandra()
cluster = new Cluster.Builder().addContactPoint("localhost").withPort(9142).build()
session = cluster.connect()
CQLDataLoader loader = new CQLDataLoader(session)
ClassPathCQLDataSet dataSet = new ClassPathCQLDataSet("dataset.cql", true, true, "testkeyspace")
loader.load(dataSet)
} catch (Exception e) {
throw new RuntimeException("Could not start cassandra server or obtain a valid session.", e);
}
}
}
// same tests
// runs after every test like JUnit 5's @BeforeEach
def cleanup() {
// truncates the tables
Collection<TableMetadata> tables = cluster.getMetadata().getKeyspace("testkeyspace").getTables()
// Groovy's clojure, slightly different syntax from Java's Lambda
tables.forEach({session.execute(QueryBuilder.truncate(it))})
}
}
Of course, since I only have one Integration Test, all the code is in that class. But, if you have more than one integration test involving Cassandra, I'd recommend creating a base class with these methods.