Search code examples
pythontwisted

Using Twisted AMP with Database insertion


I am learning how to use Twisted AMP. I am developing a program that sends data from a client to a server and inserts the data in a SQLite3 DB. The server then sends back a result to the client which indicates success or error (try and except might not be the best way to do this but it is only a temporary solution while I work out the main problem). In order to do this I modified an example I found that originally did a sum and returned the result, so I realize that this might not be the most efficient way to do what I am trying to do. In particular I am trying to do some timings on multiple insertions (i.e. send the data to the server multiple times for multiple insertions) and I have included the code I have written. It works but clearly it is not a good way to send multiple data for insertion since I am performing multiple connections before running the reactor.

I have tried several ways to get around this including passing the ClientCreator to reactor.callWhenRunning() but you cannot do this with a deferred.

Any suggestions, advice or help with how to do this would be much appreciated. Here is the code.

Server:

from twisted.protocols import amp
from twisted.internet import reactor
from twisted.internet.protocol import Factory
import sqlite3, time

class Insert(amp.Command):
    arguments = [('data', amp.Integer())]
    response = [('insert_result', amp.Integer())]

class Protocol(amp.AMP):
    def __init__(self):     
       self.conn = sqlite3.connect('biomed1.db')
       self.c =self.conn.cursor()
       self.res=None

    @Insert.responder
    def dbInsert(self, data):
        self.InsertDB(data) #call the DB inserter
        result=self.res     # send back the result of the insertion
        return {'insert_result': result}

    def InsertDB(self,data):
      tm=time.time()
      print "insert time:",tm
      chx=data
      PID=2
      device_ID=5
      try:
        self.c.execute("INSERT INTO btdata4(co2_data, patient_Id, sensor_Id) VALUES ('%s','%s','%s')" % (chx, PID, device_ID))    
      except Exception, err:
             print err
             self.res=0
      else:
             self.res=1

      self.conn.commit()


pf = Factory()
pf.protocol = Protocol
reactor.listenTCP(1234, pf) 
reactor.run()

Client:

from twisted.internet import reactor
from twisted.internet.protocol import ClientCreator
from twisted.protocols import amp
import time

class Insert(amp.Command):
    arguments = [('data', amp.Integer())]
    response = [('insert_result', amp.Integer())]

def connected(protocol):
    return protocol.callRemote(Insert, data=5555).addCallback(gotResult)

def gotResult(result):
    print 'insert_result:', result['insert_result']
    tm=time.time()
    print "stop", tm    

def error(reason):
    print "error", reason

tm=time.time()
print "start",tm
for i in range (10): #send data over ten times
  ClientCreator(reactor, amp.AMP).connectTCP(
     '127.0.0.1', 1234).addCallback(connected).addErrback(error)

reactor.run()

End of Code.

Thank you.


Solution

  • Few things which will improve your Server code.

    First and foremost: The use of direct database access functions is discouraged in twisted, as they normally causes block. Twisted has nice abstraction for database access which provides twisted approach to db connection - twisted.adbapi

    Now on to reuse of db connection: If you want to reuse certain assets (like database connection) across a number of Protocol instances, you should initialize those in constructor of Factory or if you dont fancy initiating such things at a launch time, create an resource access method, which will initiate resource upon first method call then assign it to class variable and return that on subsequent calls.

    When Factory creates a specific Protocol intance, it will add a reference to itself inside the protocol, see line 97 of twisted.internet.protocol

    Then within your Protocol instance, you can access shared database connection instance like:

    self.factory.whatever_name_for_db_connection.doSomething() 
    

    Reworked Server code (I dont have python, twisted or even decent IDE available, so this is pretty much untested, some errors are to be expected)

    from twisted.protocols import amp
    from twisted.internet import reactor
    from twisted.internet.protocol import Factory
    import time
    
    class AMPDBAccessProtocolFactory(Factory):
        def getDBConnection(self):
            if 'dbConnection' in dir(self):
                return self.dbConnection
            else:
                self.dbConnection = SQLLiteTestConnection(self.dbURL)
                return self.dbConnection
    
    class SQLLiteTestConnection(object):
        """
        Provides abstraction for database access and some business functions.
        """
        def __init__(self,dbURL):
            self.dbPool =  adbapi.ConnectionPool("sqlite3" , dbURL,  check_same_thread=False)
    
        def insertBTData4(self,data):
            query = "INSERT INTO btdata4(co2_data, patient_Id, sensor_Id) VALUES (%s,%s,%s)" 
            tm=time.time()
            print "insert time:",tm
            chx=data
            PID=2
            device_ID=5
            dF = self.dbPool.runQuery(query,(chx, PID, device_ID)) 
            dF.addCallback(self.onQuerySuccess,insert_data=data)
            return dF
        def onQuerySuccess(self,insert_data,*r):
            """
            Here you can inspect query results or add any other valuable information to be parsed at client.
            For the test sake we will just return True to a customer if query was a success.
            original data available at kw argument insert_data
            """
            return True
    
    
    class Insert(amp.Command):
        arguments = [('data', amp.Integer())]
        response = [('insert_result', amp.Integer())]
    
    class MyAMPProtocol(amp.AMP):
    
        @Insert.responder
        def dbInsert(self, data):
            db = self.factory.getDBConnection()
            dF = db.insertBTData4(data)
            dF.addErrback(self.onInsertError,data)
            return dF
    
        def onInsertError(self, error, data):
            """
            Here you could do some additional error checking or inspect data 
            which was handed for insert here. For now we will just throw the same exception again
            so that the client gets notified
            """
            raise error
    
    if __name__=='__main__':
        pf = AMPDBAccessProtocolFactory()
        pf.protocol = MyAMPProtocol
        pf.dbURL='biomed1.db'
        reactor.listenTCP(1234, pf) 
        reactor.run()
    

    Now on to the client. IF AMP follows the overall RPC logic (cant test it currently) it should be able to peruse the same connection across a number of calls. So I have created a ServerProxy class which will hold that perusable protocol instance and provide abstraction for calls:

    from twisted.internet import reactor
    from twisted.internet.protocol import ClientCreator
    from twisted.protocols import amp
    import time
    
    class Insert(amp.Command):
        arguments = [('data', amp.Integer())]
        response = [('insert_result', amp.Integer())]
    
    class ServerProxy(object):
        def connected(self,protocol):
            self.serverProxy = protocol # assign protocol as instance variable
            reactor.callLater(5,self.startMultipleInsert) #after five seconds start multiple insert procedure
    
        def remote_insert(self,data):
            return self.serverProxy.callRemote(Insert, data)
    
        def startMultipleInsert(self):
            for i in range (10): #send data over ten times
                dF = self.remote_insert(i)
                dF.addCallback(self.gotInsertResult)
                dF.addErrback(error)
    
        def gotInsertResult(self,result):
            print 'insert_result:', str(result)
            tm=time.time()
            print "stop", tm    
    
    def error(reason):
        print "error", reason
    
    
    def main():
        tm=time.time()
        print "start",tm
        serverProxy = ServerProxy()
        ClientCreator(reactor, amp.AMP).connectTCP('127.0.0.1', 1234).addCallback(serverProxy.connected).addErrback(error)
        reactor.run()    
    
    if __name__=='__main__':
        main()