Am Trying to insert data from storm into cosmos db - Mongo db
MongoClient mongoClient = null;
mongoClient = new MongoClient(new MongoClientURI("mongodb uri from azure portal"));
// Get database
MongoDatabase database = mongoClient.getDatabase("toystore");
// Get collection
MongoCollection<Document> collection = database.getCollection("order");
this.productid = tuple.getIntegerByField("productid");
this.quantity = tuple.getIntegerByField("quantity");
this.sales = tuple.getIntegerByField("sales");
this.refund = tuple.getIntegerByField("refund");
this.orderdate = tuple.getStringByField("orderdate");
// Insert documents
Document document = new Document();
document.append("productid", this.productid);
document.append("quantity", this.quantity);
document.append("sales", this.sales);
document.append("refund", this.refund);
document.append("orderdate", this.orderdate);
collection.insertOne(document);
The data should get inserted into Cosmos db. I am able to use the same code to insert into cosmos db from a separate JAVA program other than storm.
2017-12-05 03:45:03.345 o.a.s.d.executor [INFO] Opened spout eventhub-spout:(4)
2017-12-05 03:45:03.346 o.a.s.d.executor [INFO] Activating spout eventhub-spout:(4)
2017-12-05 03:45:09.618 o.m.d.cluster [INFO] Cluster created with settings {hosts=[toystore.documents.azure.com:10255], mode=MULTIPLE, requiredClusterType=REPLICA_SET, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500, requiredReplicaSetName='globaldb'}
2017-12-05 03:45:09.619 o.m.d.cluster [INFO] Adding discovered server toystore.documents.azure.com:10255 to client view of cluster
2017-12-05 03:45:09.629 o.a.s.util [ERROR] Async loop died!
java.lang.ExceptionInInitializerError: null
at com.mongodb.connection.InternalStreamConnectionFactory.<init>(InternalStreamConnectionFactory.java:41) ~[stormjar.jar:?]
at com.mongodb.connection.DefaultClusterableServerFactory.create(DefaultClusterableServerFactory.java:68) ~[stormjar.jar:?]
at com.mongodb.connection.BaseCluster.createServer(BaseCluster.java:360) ~[stormjar.jar:?]
at com.mongodb.connection.MultiServerCluster.addServer(MultiServerCluster.java:305) ~[stormjar.jar:?]
at com.mongodb.connection.MultiServerCluster.<init>(MultiServerCluster.java:83) ~[stormjar.jar:?]
at com.mongodb.connection.DefaultClusterFactory.create(DefaultClusterFactory.java:116) ~[stormjar.jar:?]
at com.mongodb.Mongo.createCluster(Mongo.java:744) ~[stormjar.jar:?]
at com.mongodb.Mongo.createCluster(Mongo.java:728) ~[stormjar.jar:?]
at com.mongodb.Mongo.createCluster(Mongo.java:702) ~[stormjar.jar:?]
at com.mongodb.Mongo.<init>(Mongo.java:310) ~[stormjar.jar:?]
at com.mongodb.Mongo.<init>(Mongo.java:306) ~[stormjar.jar:?]
at com.mongodb.MongoClient.<init>(MongoClient.java:284) ~[stormjar.jar:?]
at com.microsoft.example.CosmosDBBolt.execute(CosmosDBBolt.java:108) ~[stormjar.jar:?]
at org.apache.storm.daemon.executor$fn__9841$tuple_action_fn__9843.invoke(executor.clj:730) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.daemon.executor$mk_task_receiver$fn__9762.invoke(executor.clj:462) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.disruptor$clojure_handler$reify__874.onEvent(disruptor.clj:40) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:451) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:73) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.daemon.executor$fn__9841$fn__9854$fn__9907.invoke(executor.clj:849) ~[storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at org.apache.storm.util$async_loop$fn__558.invoke(util.clj:484) [storm-core-1.1.0.2.6.2.3-1.jar:1.1.0.2.6.2.3-1]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_151]
Caused by: java.lang.NullPointerException
at com.mongodb.connection.ClientMetadataHelper.getDriverVersion(ClientMetadataHelper.java:118) ~[stormjar.jar:?]
at com.mongodb.connection.ClientMetadataHelper.getDriverInformation(ClientMetadataHelper.java:201) ~[stormjar.jar:?]
at com.mongodb.connection.ClientMetadataHelper.addDriverInformation(ClientMetadataHelper.java:182) ~[stormjar.jar:?]
at com.mongodb.connection.ClientMetadataHelper.<clinit>(ClientMetadataHelper.java:64) ~[stormjar.jar:?]
... 23 more
It connects to cosmos db, however the connection then dies.
Thanks, Ahmad
This was answered by Larry from Microsoft on the MSDN Forum Here's the link to the answer https://social.msdn.microsoft.com/Forums/azure/en-US/1eb4f5af-a4b7-4bab-8e3d-9dfaa736e7bd/insert-data-from-hdinsight-storm-to-azure-cosmos-db?forum=hdinsight
Here's the code: https://github.com/Blackmist/hdinsight-java-storm-mongodb