Search code examples
rubypostgresqlasynchronouseventmachinegoliath

using the postgresql gem async


I'm using Goliath (which is powered by eventmachine) and the postgres gem pg, currently I'm using the pg gem in a blocking way: conn.exec('SELECT * FROM products') (for example) and I'm wondering whether there is a better way to connect to a postgres database?


Solution

  • The pg library provides full support for PostgreSQL's asynchronous API. I've added an example of how to use it to the samples/ directory:

    #!/usr/bin/env ruby
    
    require 'pg'
    
    # This is a example of how to use the asynchronous API to query the
    # server without blocking other threads. It's intentionally low-level;
    # if you hooked up the PGconn#socket to some kind of reactor, you
    # could make this much nicer.
    
    TIMEOUT = 5.0 # seconds to wait for an async operation to complete
    CONN_OPTS = {
        :host     => 'localhost',
        :dbname   => 'test',
        :user     => 'jrandom',
        :password => 'banks!stealUR$',
    }
    
    # Print 'x' continuously to demonstrate that other threads aren't
    # blocked while waiting for the connection, for the query to be sent,
    # for results, etc. You might want to sleep inside the loop or 
    # comment this out entirely for cleaner output.
    progress_thread = Thread.new { loop { print 'x' } }
    
    # Output progress messages
    def output_progress( msg )
        puts "\n>>> #{msg}\n"
    end
    
    # Start the connection
    output_progress "Starting connection..."
    conn = PGconn.connect_start( CONN_OPTS ) or 
        abort "Unable to create a new connection!"
    abort "Connection failed: %s" % [ conn.error_message ] if
        conn.status == PGconn::CONNECTION_BAD
    
    # Now grab a reference to the underlying socket so we know when the
    # connection is established
    socket = IO.for_fd( conn.socket )
    
    # Track the progress of the connection, waiting for the socket to 
    # become readable/writable before polling it
    poll_status = PGconn::PGRES_POLLING_WRITING
    until poll_status == PGconn::PGRES_POLLING_OK ||
          poll_status == PGconn::PGRES_POLLING_FAILED
    
        # If the socket needs to read, wait 'til it becomes readable to
        # poll again
        case poll_status
        when PGconn::PGRES_POLLING_READING
            output_progress "  waiting for socket to become readable"
            select( [socket], nil, nil, TIMEOUT ) or
                raise "Asynchronous connection timed out!"
    
        # ...and the same for when the socket needs to write
        when PGconn::PGRES_POLLING_WRITING
            output_progress "  waiting for socket to become writable"
            select( nil, [socket], nil, TIMEOUT ) or
                raise "Asynchronous connection timed out!"
        end
    
        # Output a status message about the progress
        case conn.status
        when PGconn::CONNECTION_STARTED
            output_progress "  waiting for connection to be made."
        when PGconn::CONNECTION_MADE
            output_progress "  connection OK; waiting to send."
        when PGconn::CONNECTION_AWAITING_RESPONSE
            output_progress "  waiting for a response from the server."
        when PGconn::CONNECTION_AUTH_OK
            output_progress "  received authentication; waiting for " +
                            "backend start-up to finish."
        when PGconn::CONNECTION_SSL_STARTUP
            output_progress "  negotiating SSL encryption."
        when PGconn::CONNECTION_SETENV
            output_progress "  negotiating environment-driven " +
                            "parameter settings."
        end
    
        # Check to see if it's finished or failed yet
        poll_status = conn.connect_poll
    end
    
    abort "Connect failed: %s" % [ conn.error_message ] unless 
        conn.status == PGconn::CONNECTION_OK
    
    output_progress "Sending query"
    conn.send_query( "SELECT * FROM pg_stat_activity" )
    
    # Fetch results until there aren't any more
    loop do
        output_progress "  waiting for a response"
    
        # Buffer any incoming data on the socket until a full result 
        # is ready. 
        conn.consume_input
        while conn.is_busy
            select( [socket], nil, nil, TIMEOUT ) or
                raise "Timeout waiting for query response."
            conn.consume_input
        end
    
        # Fetch the next result. If there isn't one, the query is 
        # finished
        result = conn.get_result or break
    
        puts "\n\nQuery result:\n%p\n" % [ result.values ]
    end
    
    output_progress "Done."
    conn.finish
    
    if defined?( progress_thread )
        progress_thread.kill
        progress_thread.join
    end
    

    I'd recommend that you read the documentation on the PQconnectStart function and the Asynchronous Command Processing section of the PostgreSQL manual, and then compare that with the sample above.

    I haven't used EventMachine before, but if it lets you register a socket and callbacks for when it becomes readable/writable, I'd think it'd be fairly easy to integrate database calls into it.

    I've been meaning to use the ideas in Ilya Grigorik's article on using Fibers to clean up evented code to make the async API easier to use, but that's a ways off. I do have a ticket open to track it if you're interested/motivated to do it yourself.