Search code examples
pythongrpcpublish-subscribe

Not able to subscribe to a topic in dapr using grpc with python


I am having a difficult time finding gRPC pub-sub subscriber template for python.

What I am trying is this, but it doesn't seem to work out.

class DaprClientServicer(daprclient_services.DaprClientServicer):
    def OnTopicEvent(self, request, context):
        if request.topic=="TOPIC_A":
            print("Do something")
            response = "some response"
        return response

server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
daprclient_services.add_DaprClientServicer_to_server(DaprClientServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()

try:
    while True:
        time.sleep(86400)
except KeyboardInterrupt:
    server.stop(0)

My publish statement looks something like this:

client.PublishEvent(dapr_messages.PublishEventEnvelope(topic='TOPIC_A', data=data))

Solution

  • Old Answer (Dapr's Python-SDK APIs changed significantly after this)

    After doing a bit of research I found that I was skipping on the step. So how subscriber work is like this:

    1. Subscribe to a topic. (missing step)

    2. Handel the messaged published to the subscribed topic.

    Doing this worked for me:

    # Our server methods
    class DaprClientServicer(daprclient_services.DaprClientServicer):
        def GetTopicSubscriptions(self, request, context):
            # Dapr will call this method to get the list of topics the app
            # wants to subscribe to. In this example, we are telling Dapr
            # To subscribe to a topic named TOPIC_A
            return daprclient_messages.GetTopicSubscriptionsEnvelope(topics=['TOPIC_A'])
    
        def OnTopicEvent(self, request, context):
            logging.info("Event received!!")
            return empty_pb2.Empty()
    
    
    # Create a gRPC server
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    daprclient_services.add_DaprClientServicer_to_server(
        DaprClientServicer(), server)
    
    # Start the gRPC server
    print('Starting server. Listening on port 50051.')
    server.add_insecure_port('[::]:50051')
    server.start()
    
    # Since server.start() doesn't block, we need to do a sleep loop
    try:
        while True:
            time.sleep(86400)
    except KeyboardInterrupt:
        server.stop(0)