Search code examples
pythonwebsocketcouchdbtwistedautobahn

Use sendMessage for CouchDB Changes using Autobahn WebSocket Library


So far I have basic code to connect my websocket server to a websocket client. I am using Autobahn for the server code, and Advanced REST Client as the client. In a separate method in the DBAlertProtocol class I am long polling a database in CouchDB for any changes that occur, i.e. add, delete, update, etc. This method gets call 5 seconds after the websocket connection is open.

There's is an issue with using sendMessage where the data is not showing up on the client side, or sometimes it takes a very long time to arrive.

Is there a way to change the communication options? Could the data be too large in size to send? I am trying to figure out why my other examples can send data successfully, but the couchdb changes notifications cannot.

Below is the code that I have so far.

Thanks in advance!

server.py

import sys
import logging
import couchdb
from twisted.python import log
from twisted.internet import reactor

from autobahn.twisted.websocket import WebSocketServerFactory, \
                        WebSocketServerProtocol, listenWS
from autobahn.twisted.resource import WebSocketResource

couch = couchdb.Server("http://localhost:5984/")
db = couch['event_db']

class DBAlertProtocol(WebSocketServerProtocol):

  def onConnect(self, request):
    print("Connection made on server side")

  def onOpen(self):
    print("WebSocket connection open.")
    reactor.callLater(5, self.check_db_changes)

  def check_db_changes(self):
    since = 1
    print("\nstart loop\n")
    while True:
      changes = db.changes(since=since, include_docs=True)
      since = changes['last_seq'] 
      no_docs_changed = len(changes)
      counter = 0
      for changeset in changes['results']:
        print("\nChange detected!\n")
        try:
          doc = db[changeset['id']]
        except couchdb.http.ResourceNotFound:
          print("Resource not found, or was deleted.")
        else:
          counter += 1
          print("Number of docs effected: {}".format(str(counter)))
          # Send change data to MW
          self.sendMessage(str(changeset))

  def onClose(self, wasClean, code, reason):
    print("WebSocket closed on server side: {}".format(reason))

  def onMessage(self, payload, isBinary):
    print("Data received from database: {}".format(payload))
    self.sendMessage("Message received.")


class DBAlertFactory(WebSocketServerFactory):
  protocol =  DBAlertProtocol


def main():
  log.startLogging(sys.stdout)

  port = 8000

  factory = DBAlertFactory(u"ws://127.0.0.1:8000")

  listenWS(factory)
  print("Listening on port: {}".format(str(port)))
  print("Starting reactor...")
  reactor.run()


if __name__ == "__main__":
  main()

Solution

  • check_db_changes never gives up control so no other part of your program can ever run:

    while True:
    

    Instead, try something like twisted.internet.task.LoopingCall.