Search code examples
javamysqlapache-sparkclassnotfoundexceptionapache-spark-sql

Apache Spark MySQL JavaRDD.foreachPartition - why I getting ClassNotFoundException


I would like to save data from each partition to MySQL Database. For doing that I created Class which implements VoidFunction<> :

public class DatabaseSaveFunction implements VoidFunction<Iterator<String>> {

    /**
     * 
     */
    private static final long serialVersionUID = -7039277486852158360L;

    public void call(Iterator<String> it) {
        Connection connect = null;
        PreparedStatement preparedStatement = null;

        try {
            Class.forName("com.mysql.jdbc.Driver");
            connect = DriverManager.getConnection("jdbc:mysql://"
                    + "xxx.us-west-2.rds.amazonaws.com" + "/"
                    + "xxx", "xxx", "xxx");

            preparedStatement = connect
                    .prepareStatement("insert into  testdatabase.test values (default, ?)");

            while (it.hasNext()) {
                String outputElement = it.next();
                preparedStatement.setString(1, "" + outputElement.length());
                preparedStatement.executeUpdate();
            }

        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            try {
                connect.close();
                preparedStatement.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

And in my main method class I'm calling:

output.foreachPartition(new DatabaseSaveFunction());

I'm getting following error:

15/05/06 15:34:00 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 4, ip-172-31-36-44.us-west-2.compute.internal): java.lang.ClassNotFoundException: DatabaseSaveFunction
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:274)

Worker log:

15/05/06 15:34:00 ERROR executor.Executor: Exception in task 1.0 in stage 1.0 (TID 5)
java.lang.ClassNotFoundException: DatabaseSaveFunction
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:274)

Can anybody tell me what I'm doing wrong ? I would be very grateful for that.


Solution

  • Export the external class to jar and add that like sc.addJar("/path/x.jar") where sc is JavaSparkContext in your main. Then you wont get this error. The error is because you spark program is not able to find the class. Moreover in spark 1.3 and greater you can simple use a map options of jdbc and then use load("jdbc", options) to create a data frame and load data from any RDBMS. its really handy. I am not sure if this method works for connecting any RDBMS into spark. Please tell me if you have any other question.