I have OSGI framework, where in, I accept REST calls in one bundle and the data received in the rest call is sent to KAFKA brocker. There is another bundle which will consume the messages from brocker.
If I initialize the KAFKA Consumer bundle before REST bundle, REST bundleActivator is never called because code runs in the while loop of KAFKA Consumer code. and if I initialize REST bundle before consumer bundle, Consumer bundle never starts.
Following is the code for Activator of KAFKA Bundle.:
public class KafkaConsumerActivator implements BundleActivator {
private static final String ZOOKEEPER_CONNECT = "zookeeper.connect";
private static final String GROUP_ID = "group.id";
private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
private static final String KEY_DESERIALIZER = "key.deserializer";
private ConsumerConnector consumerConnector;
private KafkaConsumer<String, String> consumer;
private static final String VALUE_DESERIALIZER = "value.deserializer";
public void start(BundleContext context) throws Exception {
Properties properties = new Properties();
properties.put(ZOOKEEPER_CONNECT,
MosaicThingsConstant.KAFKA_BROCKER_IP + ":" + MosaicThingsConstant.ZOOKEEPER_PORT);
properties.put(GROUP_ID, MosaicThingsConstant.KAFKA_GROUP_ID);
properties.put(BOOTSTRAP_SERVERS,
MosaicThingsConstant.KAFKA_BROCKER_IP + ":" + MosaicThingsConstant.KAFKA_BROCKER_PORT);
properties.put(KEY_DESERIALIZER, StringDeserializer.class.getName());
properties.put(VALUE_DESERIALIZER, StringDeserializer.class.getName());
consumer = new KafkaConsumer<>(properties);
try {
consumer.subscribe(Arrays.asList(MosaicThingsConstant.KAFKA_TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
}
}
}
You should never do something that takes long in the start method of an Activator. It will block the whole OSGi framework.
You best execute the whole connect and loop in an extra thread. In the stop method you can then tell this thread to exit.