Search code examples
python-3.xubuntu-16.04pymysqlpika

Python3 pika channel.basic_consume() causing MySQL too many connections


I had using pika to make a connection to RabbitMQ and consume message, once I start the script on ubuntu prod environment it is working as expected but is opening mysql connection and never closes them and ends up in Too many connection on mysql server.

Will appreciate any recommendation on the code below, as well can not understand what is going wrong. Thanking you in advance.

The flow is the following

  1. Starting pika on Python3
  2. Subscribe to a channel and waiting for messages
  3. In callback i do various validation and save or update data inside MySql
  4. The result that is showing the problem is the at the end of question a screenshot from ubuntu htop, that is showing new connection on MySql and keep adding them on the top

Pika Verion = 0.13.0

For MySql I use pymysql.

Pika Script

def main():
    credentials = pika.PlainCredentials(tunnel['queue']['name'], tunnel['queue']['password'])

    while True:
        try:
            cp = pika.ConnectionParameters(
                host=tunnel['queue']['host'],
                port=tunnel['queue']['port'],
                credentials=credentials,
                ssl=tunnel['queue']['ssl'],
                heartbeat=600,
                blocked_connection_timeout=300
            )

            connection = pika.BlockingConnection(cp)
            channel = connection.channel()

            def callback(ch, method, properties, body):
                if 'messageType' in properties.headers:
                    message_type = properties.headers['messageType']

                    if events.get(message_type):
                        result = Descriptors._reflection.ParseMessage(events[message_type]['decode'], body)
                        if result:
                            result = protobuf_to_dict(result)
                            model.write_response(external_response=result, message_type=message_type)
                    else:
                        app_log.warning('Message type not in allowed list = ' + str(message_type))
                        app_log.warning('continue listening...')

            channel.basic_consume(callback, queue=tunnel['queue']['name'], no_ack=True)
            try:
                channel.start_consuming()
            except KeyboardInterrupt:
                channel.stop_consuming()
                connection.close()
                break
        except pika.connection.exceptions.ConnectionClosed as e:
            app_log.error('ConnectionClosed :: %s' % str(e))
            continue
        except pika.connection.exceptions.AMQPChannelError as e:
            app_log.error('AMQPChannelError :: %s' % str(e))
            continue
        except Exception as e:
            app_log.error('Connection was closed, retrying... %s' % str(e))
            continue


if __name__ == '__main__':
    main()

Inside the script i have a model that doing inserts or updated in the database, code below

def write_response(self, external_response, message_type):
    table_name = events[message_type]['table_name']
    original_response = external_response[events[message_type]['response']]
    if isinstance(original_response, list):
        external_response = []
        for o in original_response:
            record = self.map_keys(o, message_type, events[message_type].get('values_fix', {}))
            external_response.append(self.validate_fields(record))
    else:
        external_response = self.map_keys(original_response, message_type, events[message_type].get('values_fix', {}))
        external_response = self.validate_fields(external_response)

    if not self.mysql.open:
        self.mysql.ping(reconnect=True)

    with self.mysql.cursor() as cursor:
        if isinstance(original_response, list):
            for e in external_response:
                id_name = events[message_type]['id_name']
                filters = {id_name: e[id_name]}
                self.event(
                    cursor=cursor,
                    table_name=table_name,
                    filters=filters,
                    external_response=e,
                    message_type=message_type,
                    event_id=e[id_name],
                    original_response=e  # not required here
                )
        else:
            id_name = events[message_type]['id_name']
            filters = {id_name: external_response[id_name]}
            self.event(
                cursor=cursor,
                table_name=table_name,
                filters=filters,
                external_response=external_response,
                message_type=message_type,
                event_id=external_response[id_name],
                original_response=original_response
            )
    cursor.close()
    self.mysql.close()

    return

On ubuntu i use systemd to run the script and restart in case something goes wrong, below is systemd file

[Unit]
Description=Pika Script
Requires=stunnel4.service
Requires=mysql.service
Requires=mongod.service

[Service]
User=user
Group=group
WorkingDirectory=/home/pika_script
ExecStart=/home/user/venv/bin/python pika_script.py
Restart=always

[Install]
WantedBy=multi-user.target

Image from ubuntu htop, how the MySql keeps adding in the list and never close it enter image description here

Error

tornado_mysql.err.OperationalError: (1040, 'Too many connections')

Solution

  • i have found the issue, posting if will help somebody else.

    the problem was that mysqld went into infinite loop trying to create indexing to a specific database, after found to which database was trying to create the indexes and never succeed and was trying again and again.

    solution was to remove the database and recreate it, and the mysqld process went back to normal. and the infinite loop to create indexes dissapeared as well.