Search code examples
rpostgresqlr-dbirpostgresqlplumber

Reconnect to PostgreSQL database with R's pool package


I have an API built with R plumber that connects to a PostgreSQL database using RPostgreSQL and pool (although this would also apply if I was using a Shiny app):

# create the connection pool
pool <- dbPool(
  drv = PostgreSQL(),
  host = Sys.getenv("DB_HOST"),
  port = 5432,
  dbname = "db",
  user = Sys.getenv("DB_USER"),
  password = Sys.getenv("DB_PASSWORD")
)

# start the API
pr <- plumb("plumber.R")

# on stop, close the pool
pr$registerHooks(
  list("exit" = function() { poolClose(pool) })
)

I want to import new data every day. The easiest way is to create a new database and promote it to production:

CREATE DATABASE db_new;
-- create the tables
-- bulk-insert the data
SELECT pg_terminate_backend (pid) FROM pg_stat_activity WHERE datname = 'db';
DROP DATABASE db;
ALTER DATABASE db_new RENAME TO db;

This is fast and minimizes downtime. The problem is that pool then loses is connection to the database and doesn't automatically attempt to reconnect:

> tbl(pool, "users")
Error in postgresqlExecStatement(conn, statement, ...) : 
  RS-DBI driver: (could not Retrieve the result : FATAL:  terminating connection due to administrator command
server closed the connection unexpectedly
    This probably means the server terminated abnormally
    before or while processing the request.
)

Even if I wasn't replacing the database every day, DB servers occasionally restart and that will also cause my app to break. Reconnection doesn't seem to be a feature of pool, RPostgreSQL, nor DBI. Does anyone know a way to deal with this problem?


Solution

  • I recently encountered a similar issue due to MySQL connections being closed when the instance's wait_timeout was exceeded. I came across your post on RStudio Community, and was inspired by your solution. In case you are still making use of it, and are in search of a solution that avoids the extra query while wrapping the actual functions you use, here is a reprex demonstrating something I came up with, along with an example proving it works:

    library(dplyr, warn.conflicts = FALSE)
    library(pool)
    library(RMariaDB)
    
    generate_safe_query <- function(pool) {
      function(db_function, ...) {
        tryCatch({
          db_function(pool, ...)
        }, error = function(e) {
          if (grepl("Lost connection to MySQL server during query", e$message)) {
            # Preserve `validationInterval` so that it can be restored
            validation_interval <- pool$validationInterval
            # Trigger destruction of dead connection
            pool$validationInterval <- 0
            refreshed_connection <- poolCheckout(pool)
            poolReturn(refreshed_connection)
            # Restore original `validationInterval`
            pool$validationInterval <- validation_interval
            # Execute the query with the new connection
            db_function(pool, ...)
          } else {
            # Unexpected error
            stop(e)
          }
        })
      }
    }
    
    mysql_pool <- dbPool(MariaDB(),
                         host = "127.0.0.1",
                         username = "root",
                         password = "",
                         dbname = "test")
    
    safe_query <- generate_safe_query(mysql_pool)
    
    # Works
    safe_query(tbl, "notes")
    #> # Source:   table<notes> [?? x 2]
    #> # Database: mysql 8.0.15 [[email protected]:/test]
    #>      id note 
    #>   <int> <chr>
    #> 1     1 NOTE1
    
    # Set the `wait_timeout` to 5 seconds for this session
    invisible(safe_query(dbExecute, "SET SESSION wait_timeout = 5"))
    
    # Wait longer than `wait_timeout` to trigger a disconnect
    Sys.sleep(6)
    
    # Still works; warning will appear notifying that connection was
    # destroyed and replaced with a new one
    safe_query(tbl, "notes")
    #> Warning: It wasn't possible to activate and/or validate the object. Trying
    #> again with a new object.
    #> # Source:   table<notes> [?? x 2]
    #> # Database: mysql 8.0.15 [[email protected]:/test]
    #>      id note 
    #>   <int> <chr>
    #> 1     1 NOTE1
    
    safe_query(poolClose)
    # Or, equivalently: 
    # poolClose(mysql_pool)
    

    Created on 2019-05-30 by the reprex package (v0.3.0)

    The function returned by generate_safe_query will work with any database query function (e.g. dbExecute, dbGetQuery, etc.). Obviously, you'll want to update the error message it matches to suit your needs.

    I have also opened my own Community topic on an option I think should be included in dbPool that would alleviate the need for such workarounds.