Search code examples
javamavencassandraapache-sparkconnector

Querying Data in Cassandra via Spark in a Java Maven Project


I'm trying to do a simple code where I create a schema, insert some tables, and then pull some information and print it out. However, I'm getting an error. I'm using the Datastax cassandra spark connector. I have been using these two examples to help me try to accomplish this:

https://gist.github.com/jacek-lewandowski/278bfc936ca990bee35a

http://www.datastax.com/documentation/developer/java-driver/1.0/java-driver/quick_start/qsSimpleClientAddSession_t.html

However, the second example doesn't use a cassandra spark connector, or spark in general.

Here is my code:

package com.angel.testspark.test;


import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.google.common.base.Optional;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;

import scala.Tuple2;

import java.io.Serializable;
import java.math.BigDecimal;
import java.text.MessageFormat;
import java.util.*;

import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class App 
{
    private transient SparkConf conf;

    private App(SparkConf conf) {
        this.conf = conf;
    }
    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        createSchema(sc);


        sc.stop();
    }

    private void createSchema(JavaSparkContext sc) {
        CassandraConnector connector = CassandraConnector.apply(sc.getConf());

        // Prepare the schema
        try (Session session = connector.openSession()) {
            session.execute("DROP KEYSPACE IF EXISTS tester");
            session.execute("CREATE KEYSPACE tester WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}");
            session.execute("CREATE TABLE tester.emp (id INT PRIMARY KEY, fname TEXT, lname TEXT, role TEXT)");
            session.execute("CREATE TABLE tester.dept (id INT PRIMARY KEY, dname TEXT)");       

            session.execute(
                      "INSERT INTO tester.emp (id, fname, lname, role) " +
                      "VALUES (" +
                          "0001," +
                          "'Angel'," +
                          "'Pay'," +
                          "'IT Engineer'" +
                          ");");
            session.execute(
                      "INSERT INTO tester.emp (id, fname, lname, role) " +
                      "VALUES (" +
                          "0002," +
                          "'John'," +
                          "'Doe'," +
                          "'IT Engineer'" +
                          ");");
            session.execute(
                      "INSERT INTO tester.emp (id, fname, lname, role) " +
                      "VALUES (" +
                          "0003," +
                          "'Jane'," +
                          "'Doe'," +
                          "'IT Analyst'" +
                          ");");
                session.execute(
                      "INSERT INTO tester.dept (id, dname) " +
                      "VALUES (" +
                          "1553," +
                          "'Commerce'" +
                          ");");

                ResultSet results = session.execute("SELECT * FROM tester.emp " +
                        "WHERE role = 'IT Engineer';");
            for (Row row : results) {
                System.out.print(row.getString("fname"));
                System.out.print(" ");
                System.out.print(row.getString("lname"));
                System.out.println(); 
            }
                System.out.println();
            }

        }

    public static void main( String[] args )
    {
        if (args.length != 2) {
            System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
        conf.setMaster(args[0]);
        conf.set("spark.cassandra.connection.host", args[1]);

        App app = new App(conf);
        app.run();
    }
}

here is my error:

14/09/18 11:22:18 WARN util.Utils: Your hostname, APAY-M-R03K resolves to a loopback address: 127.0.0.1; using 10.150.79.164 instead (on interface en0)
14/09/18 11:22:18 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
14/09/18 11:22:18 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/09/18 11:22:18 INFO Remoting: Starting remoting
14/09/18 11:22:18 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@10.150.79.164:50506]
14/09/18 11:22:18 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@10.150.79.164:50506]
14/09/18 11:22:18 INFO spark.SparkEnv: Registering BlockManagerMaster
14/09/18 11:22:18 INFO storage.DiskBlockManager: Created local directory at /var/folders/57/8s5fx3ks06bd2rzkq7yg1xs40000gn/T/spark-local-20140918112218-2c8d
14/09/18 11:22:18 INFO storage.MemoryStore: MemoryStore started with capacity 2.1 GB.
14/09/18 11:22:18 INFO network.ConnectionManager: Bound socket to port 50507 with id = ConnectionManagerId(10.150.79.164,50507)
14/09/18 11:22:18 INFO storage.BlockManagerMaster: Trying to register BlockManager
14/09/18 11:22:18 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager 10.150.79.164:50507 with 2.1 GB RAM
14/09/18 11:22:18 INFO storage.BlockManagerMaster: Registered BlockManager
14/09/18 11:22:18 INFO spark.HttpServer: Starting HTTP Server
14/09/18 11:22:18 INFO server.Server: jetty-7.6.8.v20121106
14/09/18 11:22:18 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:50508
14/09/18 11:22:18 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.150.79.164:50508
14/09/18 11:22:19 INFO spark.SparkEnv: Registering MapOutputTracker
14/09/18 11:22:19 INFO spark.HttpFileServer: HTTP File server directory is /var/folders/57/8s5fx3ks06bd2rzkq7yg1xs40000gn/T/spark-a0dc4491-1901-4a7a-86f4-4adc181fe45c
14/09/18 11:22:19 INFO spark.HttpServer: Starting HTTP Server
14/09/18 11:22:19 INFO server.Server: jetty-7.6.8.v20121106
14/09/18 11:22:19 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:50509
14/09/18 11:22:19 INFO server.Server: jetty-7.6.8.v20121106
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
14/09/18 11:22:19 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
14/09/18 11:22:19 INFO ui.SparkUI: Started Spark Web UI at http://10.150.79.164:4040
14/09/18 11:22:19 WARN core.FrameCompressor: Cannot find LZ4 class, you should make sure the LZ4 library is in the classpath if you intend to use it. LZ4 compression will not be available for the protocol.
14/09/18 11:22:19 INFO core.Cluster: New Cassandra host /127.0.0.1:9042 added
14/09/18 11:22:19 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" com.datastax.driver.core.exceptions.InvalidQueryException: No indexed columns present in by-columns clause with Equal operator
    at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
    at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256)
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:172)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
    at com.sun.proxy.$Proxy6.execute(Unknown Source)
    at com.angel.testspark.test.App.createSchema(App.java:85)
    at com.angel.testspark.test.App.run(App.java:38)
    at com.angel.testspark.test.App.main(App.java:109)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: No indexed columns present in by-columns clause with Equal operator
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:97)
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:108)
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:235)
    at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:367)
    at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:584)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
14/09/18 11:22:20 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster

I believe it may just be a syntax error, i'm just unsure of where and what it is.

Any help would be great, thanks. I've scoured the internet and haven't found a simple example of just inserting data and pulling data in java with cassandra and spark.

******edit: @BryceAtNetwork23 and @mikea were correct about my syntax error, so i've edited the question and fixed it. I am getting a new error though so I've pasted in the new error and updated the code


Solution

  • Try running your CQL via cqlsh and you should get the same/similar error:

    aploetz@cqlsh:stackoverflow> CREATE TABLE dept (id INT PRIMARY KEY, dname TEXT);
    aploetz@cqlsh:stackoverflow> INSERT INTO dept (id, dname) VALUES (1553,Commerce);
    <ErrorMessage code=2000 [Syntax error in CQL query] message="line 1:50 no viable alternative at
    input ')' (... dname) VALUES (1553,Commerce[)]...)">
    

    Put single quotes around "Commerce" and it should work:

    session.execute(
                      "INSERT INTO tester.dept (id, dname) " +
                      "VALUES (" +
                          "1553," +
                          "'Commerce'" +
                          ");");
    

    I'm getting a new error though now...

    Also try running that from cqlsh.

    aploetz@cqlsh:stackoverflow> SELECT * FROM emp WHERE role = 'IT Engineer';
    code=2200 [Invalid query] message="No indexed columns present in by-columns clause with Equal operator"
    

    This is happening because role is not defined as your primary key. Cassandra doesn't allow you to query by arbitrary column values. The best way to solve this one, is to create an additional query table called empByRole, with role as the partition key. Like this:

    CREATE TABLE empByRole 
        (id INT, fname TEXT, lname TEXT, role TEXT,
        PRIMARY KEY (role,id)
    );
    
    aploetz@cqlsh:stackoverflow> INSERT INTO empByRole (id, fname, lname, role) VALUES (0001,'Angel','Pay','IT Engineer');
    aploetz@cqlsh:stackoverflow> SELECT * FROM empByRole WHERE role = 'IT Engineer';
    
     role        | id | fname | lname
    -------------+----+-------+-------
     IT Engineer |  1 | Angel |   Pay
    
    (1 rows)