Search code examples
apache-kafkaapache-apex

How to unit test Kafka 0.9 operator with Apache Apex?


Repost from users@apex.incubator.apache.org

I would like to run a unit test code using the new Kafka Operator that supports the 0.9 version protocol.

In this process, I included the Malhar-Kafka library version ( 3.3.1-incubating ) and am using the Apex-engine ( version 3.3.0 ) with as test/provided.

The compilation works fine but my unit tests fail to run properly with " java.lang.ClassNotFoundException: com.datatorrent.lib.util.KryoCloneUtils" exception.

What is the recommended way to run a unit test which uses Kafka 0.9 operator integrated with the Apex engine ? I am assuming that the Malhar-contrib library Kafka operator is not 0.9 compliant ..

The unit test code is like this :

The class CassandraEventDetailsStreamingApp extends the AbstractKafkaInputOperator in the below snippet of code.

The exception arises in the method lma.getController();

@Test
public void testApplication() throws IOException, Exception {
    try {
        LocalMode lma = LocalMode.newInstance();
        Configuration conf = new Configuration(false);
        conf.addResource(this.getClass().getResourceAsStream("/dag-test-props.xml"));
        lma.prepareDAG(new CassandraEventDetailsStreamingApp(), conf);
        LocalMode.Controller lc = lma.getController();
        lc.run();
    } catch (ConstraintViolationException e) {
        Assert.fail("constraint violations: " + e.getConstraintViolations());
    }
}

Solution

  • I was able to resolve the problem by excluding the dependencies of Malhar-library and Malhar-contrib from the dependency sections of the Apex-engine , apex-api .

    This made the 3.3.1-incubating version of the Malhar kick into the classpath and subsequently the Malhar-Kafka library with 3.3.1-incubating version) .