I am having a difficult time finding gRPC pub-sub subscriber template for python.
What I am trying is this, but it doesn't seem to work out.
class DaprClientServicer(daprclient_services.DaprClientServicer):
def OnTopicEvent(self, request, context):
if request.topic=="TOPIC_A":
print("Do something")
response = "some response"
return response
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
daprclient_services.add_DaprClientServicer_to_server(DaprClientServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
server.stop(0)
My publish statement looks something like this:
client.PublishEvent(dapr_messages.PublishEventEnvelope(topic='TOPIC_A', data=data))
After doing a bit of research I found that I was skipping on the step. So how subscriber work is like this:
Subscribe to a topic. (missing step)
Handel the messaged published to the subscribed topic.
Doing this worked for me:
# Our server methods
class DaprClientServicer(daprclient_services.DaprClientServicer):
def GetTopicSubscriptions(self, request, context):
# Dapr will call this method to get the list of topics the app
# wants to subscribe to. In this example, we are telling Dapr
# To subscribe to a topic named TOPIC_A
return daprclient_messages.GetTopicSubscriptionsEnvelope(topics=['TOPIC_A'])
def OnTopicEvent(self, request, context):
logging.info("Event received!!")
return empty_pb2.Empty()
# Create a gRPC server
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
daprclient_services.add_DaprClientServicer_to_server(
DaprClientServicer(), server)
# Start the gRPC server
print('Starting server. Listening on port 50051.')
server.add_insecure_port('[::]:50051')
server.start()
# Since server.start() doesn't block, we need to do a sleep loop
try:
while True:
time.sleep(86400)
except KeyboardInterrupt:
server.stop(0)