Search code examples
pythonpikaconnexion

How to initialize RabbitMQ consumer using pika and connexion


I'm trying to set up a Python microservice that receives messages through RabbitMQ, while having a /health REST endpoint for the Kubernetes healthcheck. I'm using pika for the RabbitMQ consumer, and connexion for the REST endpoint.

However, when I start the RabbitMQ consumer in main() the connexion app wont start.

python-app.py

#!/usr/bin/env python
import pika, sys, os, connexion
from flask import Flask, request, jsonify

app = connexion.FlaskApp(__name__, specification_dir='./')

def main():
    # Connection
    ...
    # Exchange and queues
    ...

    def callback(ch, method, properties, body):
        ...

    channel.basic_consume(queue='pg-python', on_message_callback=callback, auto_ack=True)

    print(' [*] Waiting for messages.')
    channel.start_consuming()
    app.run(port=8080, use_reloader=False)

@app.route('/api/v1/health', methods=['GET'])
def return_health():
    message = {'status':'Healthy! <3'}
    return jsonify(message)

if __name__ == '__main__':
    try:
        main()
    except KeyboardInterrupt:
        print('Interrupted')
        try:
            sys.exit(0)
        except SystemExit:
            os._exit(0)

Output:

[*] Waiting for messages.

How can I properly initialize both components? Do I need to use threads?


Solution

  • I've managed to solve the issue by initializing the RabbitMQ consumer in a separate thread:

    #!/usr/bin/env python
    import pika, sys, os, threading
    from flask import Flask, request, jsonify
    
    app = Flask(__name__)
    
    def start_rmq_connection():
        # Connection
        ...
        # Exchange and queues
        ...
    
        def callback(ch, method, properties, body):
            ...
    
        channel.basic_consume(queue='pg-python', on_message_callback=callback, auto_ack=True)
        print(' [*] Waiting for messages.')
        channel.start_consuming()
    
    @app.route('/api/v1/health', methods=['GET'])
    def return_health():
        message = {'status':'Healthy! <3'}
        return jsonify(message)
    
    if __name__ == '__main__':
        try:
            thread_1 = threading.Thread(target=start_rmq_connection)
            thread_1.start()
            thread_1.join(0)
            app.run()
        except KeyboardInterrupt:
            print('Interrupted')
            try:
                sys.exit(0)
            except SystemExit:
                os._exit(0)