Search code examples
javamysqlmultithreadingjdbcproducer-consumer

Getting data from a very large table


I have a very large table in a MySQL database, 200 million records in table Users.

I make query, using JDBC:

public List<Pair<Long, String>> getUsersAll() throws SQLException {
        Connection cnn = null;
        CallableStatement cs = null;
        ResultSet rs = null;
        final List<Pair<Long, String>> res = new ArrayList<>();
        try {
            cnn = dataSource.getConnection();
            cs = cnn.prepareCall("select UserPropertyKindId, login from TEST.users;");
            rs = cs.executeQuery();
            while (rs.next()) {
                res.add(new ImmutablePair<>(rs.getLong(1), rs.getString(2)));
            }
            return res;
        } catch (SQLException ex) {
            throw ex;
        } finally {
            DbUtils.closeQuietly(cnn, cs, rs);
        }
    }

Next, I process the result:

List<Pair<Long, String>> users= dao.getUsersAll();
            if (CollectionUtils.isNotEmpty(users)) {
                for (List<Pair<Long, String>> partition : Lists.partition(users, 2000)) {
                    InconsistsUsers.InconsistsUsersCallable callable = new InconsistsUsers.InconsistsUsersCallable (new ArrayList<>(partition));
                    processExecutor.submit(callable);
                }
            }

But since the table is very large and it is all unloaded into memory, my application crashes with an error:

com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet successfully received from the server was 105,619 milliseconds ago.

How can I receive data in parts and process them in order of priority, so as not to upload all the result into memory at once? It may be possible to create a cursor and upload data to a non-blocking queue and process it as the data arrives. How can this be done?

UPDATE:

My DB structure: https://www.db-fiddle.com/f/v377ZHkG1YZcdQsETtPm9L/3

Current algorithm:

  1. Get all data users from Users table: select UserPropertyKindId, login from Users;

  2. This result is split for 2000 pairs and submit to ThreadPoolTaskExecutor:

    List<Pair<Long, String>> users= dao.getUsersAll();
    
    if (CollectionUtils.isNotEmpty(users)) {
        for (List<Pair<Long, String>> partition : Lists.partition(users, 2000)) {
            InconsistsUsers.InconsistsUsersCallable callable = new InconsistsUsers.InconsistsUsersCallable(new ArrayList<>(partition));
            processExecutor.submit(callable));
        }
    }
    
  3. In callable for each pair make two queries:

    First query:

    select distinct entityId 
    from UserPropertyValue 
    where userPropertyKindId= ? and value = ? -- value its login from Users table
    

    Second query:

    select UserIds 
    from UserPropertyIndex 
    where UserPropertyKindId = ? and Value = ?
    

Two cases are possible:

  1. Result of first query is empty: logging, send notification, continue to next pair
  2. Result of second query is not equal to the result of the first query (varbinary data decoded. There are stored encoded entityId's). Then logging, send notification, go to next pair.

I can't change the structure of the base. All the manipulations I have to do on the Java code side.


Solution

  • You should handle this on several levels:

    JDBC driver fetch size

    JDBC has a Statement.setFetchSize() method, which indicates how many rows are going to be pre-fetched by the JDBC driver prior to you getting them from JDBC. Note that MySQL JDBC drivers don't really implement this correctly, but you can set setFetchSize(Integer.MIN_VALUE) to prevent it from fetching all rows in one go. See also this answer here.

    Note, you may also activate the feature on your connection using useCursorFetch

    Your own logic

    You should not put the entire list of users in memory. What you're doing right now is collecting all the rows from JDBC and then partitioning your list later on using Lists.partition(users, 2000). This is going in the right direction, but you're not doing it right yet. Instead, do:

    try (ResultSet rs = cs.executeQuery()) {
        while (rs.next()) {
            res.add(new ImmutablePair<>(rs.getLong(1), rs.getString(2)));
        }
    
        // Process a batch of rows:
        if (res.size() >= 2000) {
            process(res);
            res.clear();
        }
    }
    
    // Process the remaining rows
    process(res);
    

    The important message here is to not load all rows in memory and then process them in batches, but to process them directly while streaming rows from JDBC.