Search code examples
jdbcparallel-processingmonetdb

Monet DB JDBC parallel executeBatch failing


In my application, I want to insert multiple rows in different tables in parallel. For this I am creating prepared statement in each thread and using exceuteBatch with 10K as batch size. I have made auto commit as false. And after each executeBatch, I commit the transaction using connection.commit. In single thread, this code works fine, but in multi thread, when it starts inserting in different tables ( distinct different table in each thread ), there is commit fail exception coming. Please guide, how to make parallel insertions ( please note, all the threads working in different tables which have no link with each other ).

Thanks, Vikas


Solution

  • Problem

    Here is the code which I created to get a concurrency conflict error from monetDB driver.

    public static class TestRunner implements Runnable {
    private final static String problemHere = "jdbc:monetdb://localhost/test-db";
    
    static {
        try {
        Class.forName("nl.cwi.monetdb.jdbc.MonetDriver");
        } catch (ClassNotFoundException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
        }
    }
    
    private static Connection createConnection() throws SQLException {
        // happens one time per thread
        return DriverManager.getConnection(problemHere, "myID", "myPass");
    }
    
    private static int fieldCount = 10;
    private static int[] colType = new int[fieldCount];
    private static final int[] types = { Types.VARCHAR, Types.TIMESTAMP, Types.DECIMAL };
    
    static {
        Random random = new Random();
        // initialize column types
        for (int i = 0; i < fieldCount; i++) {
        colType[i] = types[random.nextInt(types.length)];
        }
    }
    
    private final String name;
    private Connection con;
    private int batchCount;
    private int batchSize;
    private String create;
    private String drop;
    private String insert;
    
    public TestRunner(String string, int bs, int bc) throws SQLException {
        this.name = string;
        this.con = createConnection();
        this.batchCount = bc;
        this.batchSize = bs;
        this.create = "create table " + name + " (";
    
        for (int i = 0; i < fieldCount; i++) {
        create += (i == 0 ? "" : ",") + "col" + i + " " + getType(colType[i]);
        }
        this.create += ")";
        this.insert = "insert into " + name + " values (";
        for (int i = 0; i < fieldCount; i++) {
        insert += (i == 0 ? "" : ",") + "?";
        }
        this.insert += ")";
        this.drop = "drop table " + name;
    }
    
    private static String getType(int i) {
        switch (i) {
        case Types.DECIMAL:
        return "decimal(18,9)";
        case Types.VARCHAR:
        return "varchar(30000)";
        case Types.TIMESTAMP:
        return "timestamp";
        }
        return null;
    }
    
    protected void finalize() throws Throwable {
        if (con != null) {
        con.close();
        }
    };
    
    public void run() {
        System.out.format("%s started.%ncreate: %s%ninsert: %s%ndrop: %s%n", name, create, insert, drop);
        try (Statement stmt = con.createStatement()) {
        // this will throw the exception of concurrency conflict
        System.out.format("%d created %s%n", stmt.executeUpdate(create), name);
        if (!con.getAutoCommit()) {
            con.commit();
        }
        } catch (SQLException e1) {
        e1.printStackTrace();
        return;
        }
        try (PreparedStatement stmt = con.prepareStatement(insert)) {
        while (batchCount-- > 0) {
            for (int i = batchSize; i > 0; i--) {
            setBatchInput(stmt);
            stmt.addBatch();
            }
            System.out.format("%s - submitting batch ...%n", name);
            stmt.executeBatch();
            if (!con.getAutoCommit()) {
            con.commit();
            }
        }
        } catch (Exception e) {
        e.printStackTrace();
        } finally {
        if (con != null) {
            try (Statement stmt = con.createStatement()) {
            System.out.format("%d deleted - %s%n", stmt.executeUpdate(drop), name);
            if (!con.getAutoCommit()) {
                con.commit();
            }
            } catch (SQLException e1) {
            e1.printStackTrace();
            }
            try {
            con.close();
            con = null;
            } catch (SQLException e) {
            e.printStackTrace();
            }
        }
        }
        System.out.format("%s finished.", name);
    }
    
    private static void setBatchInput(PreparedStatement stmt) throws SQLException {
        for (int i = 1; i <= fieldCount; i++) {
        stmt.setObject(i, getRandomFieldValue(colType[i - 1]));
        }
    }
    
    private static Object getRandomFieldValue(int type) {
        switch (type) {
        case Types.DECIMAL:
        return 0;
        case Types.VARCHAR:
        return "null";
        case Types.TIMESTAMP:
        return new Timestamp(System.currentTimeMillis());
        }
        return null;
    }
    }
    
    public static void main(String[] args) {
    int num = 2;
    List<Thread> ts = new ArrayList<>();
    while (num-- > 0) {
        Thread t = null;
        try {
        (t = new Thread(new TestRunner("runner_" + num, 10000, 1))).start();
        } catch (SQLException e) {
        e.printStackTrace();
        }
        ts.add(t);
    }
    for (Thread t : ts) {
        try {
        t.join();
        } catch (InterruptedException e) {
        e.printStackTrace();
        }
    }
    }
    

    and this is the exception i get

    java.sql.SQLException: COMMIT: transaction is aborted because of concurrency conflicts, will ROLLBACK instead
    at nl.cwi.monetdb.jdbc.MonetConnection$ResponseList.executeQuery(MonetConnection.java:2597)
    at nl.cwi.monetdb.jdbc.MonetConnection$ResponseList.processQuery(MonetConnection.java:2345)
    at nl.cwi.monetdb.jdbc.MonetStatement.internalExecute(MonetStatement.java:507)
    at nl.cwi.monetdb.jdbc.MonetStatement.execute(MonetStatement.java:345)
    at nl.cwi.monetdb.jdbc.MonetStatement.executeUpdate(MonetStatement.java:545)
    at monet.test.BatchTest$TestRunner.run(BatchTest.java:92)
    at java.lang.Thread.run(Thread.java:745)
    

    And if I synchronize the creation then there is a batch failure in one thread -

    java.sql.SQLException: EXEC: no prepared statement with id: 2no prepared statement with id: 2

    According to this thread!

    If you perform a schema update/change, MonetDB releases all prepared handles, because they possibly are no longer correct. You need to re-execute your prepare command.

    Solution

    Using the functionality of MonetDB bulk data load! You can directly write the data to server instead of using JDBC (Even with connections open in parallel) Following is the code which can get you to test it straight

    public static class TestRunner implements Runnable {
    private final static String problemHere = "jdbc:monetdb://localhost:50000/test-db";
    
    static {
        try {
        Class.forName("nl.cwi.monetdb.jdbc.MonetDriver");
        } catch (ClassNotFoundException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
        }
    }
    
    private static Connection createConnection() throws SQLException {
        // happens one time per thread
        return DriverManager.getConnection(problemHere, "monetdb", "monetdb");
    }
    
    private static int fieldCount = 10;
    private static int[] colType = new int[fieldCount];
    private static final int[] types = { Types.VARCHAR, Types.TIMESTAMP, Types.DECIMAL };
    
    static {
        Random random = new Random();
        // initialize column types
        for (int i = 0; i < fieldCount; i++) {
        colType[i] = types[random.nextInt(types.length)];
        }
    }
    
    private final String name;
    private Connection con;
    private int batchCount;
    private int batchSize;
    private String create;
    private String drop;
    private String insert;
    
    public TestRunner(String string, int bs, int bc) throws SQLException {
        this.name = string;
        this.con = createConnection();
        this.batchCount = bc;
        this.batchSize = bs;
        this.create = "create table " + name + " (";
    
        for (int i = 0; i < fieldCount; i++) {
        create += (i == 0 ? "" : ",") + "col" + i + " " + getType(colType[i]);
        }
        this.create += ")";
        this.insert = "insert into " + name + " values (";
        for (int i = 0; i < fieldCount; i++) {
        insert += (i == 0 ? "" : ",") + "?";
        }
        this.insert += ")";
        this.drop = "drop table " + name;
    }
    
    private static String getType(int i) {
        switch (i) {
        case Types.DECIMAL:
        return "decimal(18,9)";
        case Types.VARCHAR:
        return "varchar(30000)";
        case Types.TIMESTAMP:
        return "timestamp";
        }
        return null;
    }
    
    protected void finalize() throws Throwable {
        if (con != null) {
        con.close();
        }
    };
    
    public void run() {
        System.out.format("%s started.%ncreate: %s%ninsert: %s%ndrop: %s%n", name, create, insert, drop);
        try (Statement stmt = con.createStatement()) {
        // this will throw the exception of concurrency con
        synchronized (problemHere) {
            System.out.format("%d created %s%n", stmt.executeUpdate(create), name);
            if (!con.getAutoCommit()) {
            con.commit();
            }
        }
        } catch (SQLException e1) {
        e1.printStackTrace();
        return;
        }
        MapiSocket server = null;
        try {
        server = new MapiSocket();
        server.setDatabase("test-db");
        server.setLanguage("sql");
    
        List<String> warning = server.connect("localhost", 50000, "monetdb", "monetdb");
        if (warning != null) {
            for (Iterator<String> it = warning.iterator(); it.hasNext();) {
            System.out.println(it.next().toString());
            }
        }
    
    
        String error = in.waitForPrompt();
        if (error != null)
            throw new Exception(error);
        // try (PreparedStatement stmt = con.prepareStatement(insert)) {
        while (batchCount-- > 0) {
        BufferedMCLReader in = server.getReader();
        BufferedMCLWriter out = server.getWriter();
            String query = "COPY INTO " + name + " FROM STDIN USING DELIMITERS ',','\\n';";
            // the leading 's' is essential, since it is a protocol
            // marker that should not be omitted, likewise the
            // trailing semicolon
            out.write('s');
            out.write(query);
            out.newLine();
            for (int i = batchSize; i > 0; i--) {
            // setBatchInput(stmt);
            // stmt.addBatch();
            insertBatchElement(out);
            out.newLine();
            }
            System.out.format("%s - submitting batch ...%n", name);
            // stmt.executeBatch();
            out.writeLine(""); // need this one for synchronisation over
            // flush()
            out.flush();
            error = in.waitForPrompt();
            if (error != null)
            throw new Exception(error);
            out.close();
            in.close();
            if (!con.getAutoCommit()) {
            con.commit();
            }
        }
        } catch (Exception e) {
        e.printStackTrace();
        } finally {
        if (con != null) {
            try (Statement stmt = con.createStatement()) {
            synchronized (problemHere) {
                // System.out.format("%d deleted - %s%n",
                // stmt.executeUpdate(drop), name);
                if (!con.getAutoCommit()) {
                con.commit();
                }
            }
            } catch (SQLException e1) {
            e1.printStackTrace();
            }
            try {
            con.close();
            con = null;
            } catch (SQLException e) {
            e.printStackTrace();
            }
        }
        if (server != null) {
            server.close();
        }
        }
        System.out.format("%s finished.", name);
    }
    
    private void insertBatchElement(BufferedMCLWriter out) throws IOException {
        String data = "";
        for (int i = 1; i <= fieldCount; i++) {
        data += (i == 1 ? "" : ",") + String.valueOf(getRandomFieldValue(colType[i - 1]));
        }
        out.write(data);
    }
    
    private static void setBatchInput(PreparedStatement stmt) throws SQLException {
        for (int i = 1; i <= fieldCount; i++) {
        stmt.setObject(i, getRandomFieldValue(colType[i - 1]));
        }
    }
    
    private static Object getRandomFieldValue(int type) {
        switch (type) {
        case Types.DECIMAL:
        return 0;
        case Types.VARCHAR:
        return "null";
        case Types.TIMESTAMP:
        return new Timestamp(System.currentTimeMillis());
        }
        return null;
    }
    }
    
    public static void main(String[] args) {
    int num = 2;
    List<Thread> ts = new ArrayList<>();
    while (num-- > 0) {
        Thread t = null;
        try {
        (t = new Thread(new TestRunner("runner_" + num, 10000, 1))).start();
        } catch (SQLException e) {
        e.printStackTrace();
        }
        ts.add(t);
    }
    for (Thread t : ts) {
        try {
        t.join();
        } catch (InterruptedException e) {
        e.printStackTrace();
        }
    }
    }