Search code examples
scalaapache-sparkcassandraspark-cassandra-connectorspark-jobserver

How to handle cassandra connections in spark job?


I am doing stress test on my spark application which uses spark cassandra connector as well as cassandra driver. In my application , I am using cassandra driver to select the most recent value from the C* table. This is working fine as long as the spark job submit happens one by one via spark-job server. But if multiple job submit (num of requests = 80) happens simultaneously, then I am getting the exception as below.

     org.jboss.netty.channel.ChannelException: Failed to create a selector.
        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:343) ~[netty-3.8.0.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:100) ~[netty-3.8.0.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:52) ~[netty-3.8.0.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45) ~[netty-3.8.0.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45) ~[netty-3.8.0.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28) ~[netty-3.8.0.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:143) ~[netty-3.8.0.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:81) ~[netty-3.8.0.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39) ~[netty-3.8.0.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:33) ~[netty-3.8.0.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.<init>(NioClientSocketChannelFactory.java:151) ~[netty-3.8.0.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.<init>(NioClientSocketChannelFactory.java:116) ~[netty-3.8.0.Final.jar:na]
        at com.datastax.driver.core.Connection$Factory.<init>(Connection.java:532) ~[cassandra-driver-core-2.1.5.jar:na]
        at com.datastax.driver.core.Cluster$Manager.<init>(Cluster.java:1201) ~[cassandra-driver-core-2.1.5.jar:na]
        at com.datastax.driver.core.Cluster$Manager.<init>(Cluster.java:1144) ~[cassandra-driver-core-2.1.5.jar:na]
        at com.datastax.driver.core.Cluster.<init>(Cluster.java:121) ~[cassandra-driver-core-2.1.5.jar:na]
        at com.datastax.driver.core.Cluster.<init>(Cluster.java:108) ~[cassandra-driver-core-2.1.5.jar:na]
        at com.datastax.driver.core.Cluster.buildFrom(Cluster.java:177) ~[cassandra-driver-core-2.1.5.jar:na]
        at com.datastax.driver.core.Cluster$Builder.build(Cluster.java:1109) ~[cassandra-driver-core-2.1.5.jar:na]
        ...
         Caused by: java.io.IOException: Too many open files
        at sun.nio.ch.IOUtil.makePipe(Native Method) ~[na:1.7.0_55]
        at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65) ~[na:1.7.0_55]
        at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36) ~[na:1.7.0_55]
        at java.nio.channels.Selector.open(Selector.java:227) ~[na:1.7.0_55]
        at org.jboss.netty.channel.socket.nio.SelectorUtil.open(SelectorUtil.java:63) ~[netty-3.8.0.Final.jar:na]
        at org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:341) ~[netty-3.8.0.Final.jar:na]

I am running the job by creating single context in spark job server.

My code

 val dateQuery = "SELECT st_date FROM %s limit 1"
 val queryString = dateQuery.format(tableName)
 val cluster = Cluster.builder().addContactPoints(cassandraHosts: _*)
  .withCredentials(username, password).build()
 val session = cluster.connect(keyspace)
 val queryResult = Try(session.execute(queryString).map(x => x.getDate("st_date")).head)
 cluster.close()

Questions

Is there anything I am doing Wrong in the code?

How to address this problem ?

Should I create a singleton cluster Object for the entire application and to share it ?

Should I use sc.cassandraTable method instead of directly using the java driver ?


Solution

  • I'm not familiar with the spark jobserver, but the code snippet does not look correct.

    First off you are not closing the session, which should be done before closing the cluster.

    Second, you should be re-using the session for each query and not opening and closing it for each individual query.

    So yes, the cluster and session should be created as a singleton and re-used, and you would usually only close them when your application exits.