Search code examples
javajdbcc3p0

Java DB Queries and C3P0 for Concurrent calls


I'm trying to wrap my head around C3P0 and database calls. I originally had a program to run on SQLite and now I'm trying to allow concurrency to test queries on MariaDB. There are a few items I'm not grasping with this. The original design for SQLite was to have a producer thread that put queries onto a Queue and a consumer thread that would take from the queue and issue the DB query to the database.

I'm wondering if this single thread will be able to issue concurrent requests or not (since it's only one thread).

Secondly I am having an issue with this apparently not returning connections, or so it seems as such as it stops after about 18 queries. There are still items in the queue but the program just stops and waits at the try for a new connection.

My main Database calling thread class:

public class DBRunnable extends DBExtend implements Runnable
{
/**
 * Call the query builder instance
 */
protected QueryBuilder qb = QueryBuilder.getInstance();

/**
 * Call the point type converter instance
 */
protected PointTypeConv pv = PointTypeConv.getInstance();

/**
 * Singleton object
 */
private static DBRunnable db = null;

/**
 * Constructor
 */
public DBRunnable()
{
}

/**
 * Main thread functionality
 */
@Override
public void run()
{
    try
    {

        while (true)
        {

            long startTime = 0;

            QueryRequest msg = null;

            try
            {
                // Pull any existing query requests off the queue, if not, wait for one.
                msg = (QueryRequest) DBMigrationTool.dbProcQueue.take();
            } catch (Exception e)
            {
                errorLog.error("Unable to fetch message from message processing queue.");
            }

            // Good practice to create a new result set instead of reusing
            ResultSet rs = null;
            Statement stmt = null;

                    // Fetch the query and the request out of the QueryRequest object
                    String query = msg.getQuery();
                    // Make sure the query request isn't empty, if it is, there is no point in sending it to the DB

                    try (Connection conn = cpds.getConnection())
                    {
                        // Execute the given query and fetch the result from it
                        stmt = conn.createStatement();
                        startTime = System.currentTimeMillis();
                        stmt.setQueryTimeout(1800);
                        System.out.println(query);
                        stmt.execute(query);
                        rs = stmt.getResultSet();

                        if (rs != null)
                        {
                            try
                            {
                                int count = 0;
                                while (rs.next())
                                {
                                    count++;
                                }

                                System.out.println("Query Complete: " + (System.currentTimeMillis() - startTime) + "ms. Result count: " + count);

                                if (msg.getFlag() == 1)
                                {
                                    DBMigrationTool.flag = 0;
                                }

                            } catch (Exception e)
                            {
                                errorLog.error("Failed to process database result set.");
                            }

                        }
                        conn.close();
                    } catch (SQLException e)
                    {
                        errorLog.error("Query Error: " + msg.getQuery());
                        errorLog.error("Failed to issue database command: " + e);
                    } finally
                    {
                        if (rs != null)
                        {
                            try
                            {
                                rs.close();
                            } catch (SQLException e)
                            {
                                errorLog.error("Failed to close JDBC result set.");
                            }
                        }
                        if (stmt != null)
                        {
                            try
                            {
                                stmt.close();
                            } catch (SQLException e)
                            {
                                errorLog.error("Failed to close JDBC statement.");
                            }
                        }
                    }
                }                

    } finally
    {
        closeDB();
        DBMigrationTool.dbProcHandle.cancel(true);
    }

}

My interface DB class that contains connection information:

public class DBExtend
{
/**
 * Standard timeout
 */
public static final int DB_TIMEOUT = 30;

/**
 * Standard error logger for log4j2
 */
protected static Logger errorLog = LogManager.getLogger(DBExtend.class.getName());

/**
 * Call to the query builder instance
 */
private static QueryBuilder qb = QueryBuilder.getInstance();

/**
 * DB connection
 */
protected static ComboPooledDataSource cpds;

/**
 * Constructor
 */
public DBExtend()
{
}

/**
 * startDB is an initialization function used to open a database connection
 * 
 * @param dbPath - System path to the database file
 */
public void startDB(String dbPath)
{

    cpds = new ComboPooledDataSource();
    cpds.setJdbcUrl("jdbc:sqlite:" + dbPath);
    cpds.setMinPoolSize(1);
    cpds.setTestConnectionOnCheckout(true);
    cpds.setAcquireIncrement(5);
    cpds.setMaxPoolSize(20);

    errorLog.info("Connection to SQLite has been established.");

}

public void startMariaDB(String tableName)
{
    cpds = new ComboPooledDataSource();
    cpds.setJdbcUrl("jdbc:mariadb://localhost:3306/" + tableName);
    cpds.setUser("root");
    cpds.setPassword("joy");
    cpds.setMinPoolSize(1);
    cpds.setTestConnectionOnCheckout(true);
    cpds.setAcquireIncrement(5);
    cpds.setMaxPoolSize(20);

    errorLog.info("Connection to MariaDB has been established.");
}

/**
 * Close DB is to close a database instance
 */
public void closeDB()
{
    try
    {
        cpds.close();

        errorLog.info("Connection to SQLite has been closed.");

    } catch (SQLException e)
    {
        errorLog.error(e.getMessage());
    } finally
    {
        try
        {
            if (cpds.getConnection() != null)
            {
                cpds.getConnection().close();
            }
            if (cpds != null)
            {
                cpds.close();
            }
        } catch (SQLException ex)
        {
            errorLog.error(ex.getMessage());
        }
    }
}

}

Solution

  • A JDBC driver is required to be thread safe, and it abstracts away the implementation details. Note that although drivers are threadsafe, it is still not a good idea to use the same connection object concurrently from multiple threads.

    As to your actual problem, you are using the data source from C3P0 entirely wrong. A data source backed by a connection pool gives users a connection from this pool using the getConnection() method. This connection is returned to the pool when you close that connection.

    This means that you get a connection from the pool, do your work and then close it so it is returned to the pool for use by other parts of your application.

    This means that the following code in DBRunnable is wrong:

    if (cpds.getConnection().isValid(DB_TIMEOUT))
    

    You get a connection from the pool and then immediately leak it (it isn't returned to the pool) as you hold no reference to it. Note that most connection pools (sometimes optionally) do connection validation before returning a connection, so it shouldn't be necessary to test it.

    Similarly for your DBExtend class, this is wrong:

    In selectMariaDB:

    cpds.getConnection().setCatalog(DBName);
    

    Here you get a connection from the pool, and never close it, meaning you have 'leaked' this connection. Setting the catalog has no effect, as this connection will not be reused. Setting the catalog in this case should be part of your connection pool configuration.

    In closeDB:

    cpds.getConnection().close();
    

    This obtains a connection from the pool and immediately closes it (returning it to the pool). That has no practical purpose.