Search code examples
pythonmysqlexceptionmultiprocessingpeewee

How to use Peewee with multiprocessing, without "Commands out of sync; you can't run this command now" exception?


I write a large multi-process application and use Peewee as ORM, but when multiple processes try to access the database at the same time I get a synchronization error. How can I fix this problem without changing the application structure?

Short code causing the problem:

from multiprocessing import Process
from time import sleep
from abc import ABC
from playhouse.shortcuts import *


class MySqlAutoReconnectDatabase(ReconnectMixin, MySQLDatabase, ABC):

    def __init__(self):
        super().__init__(
            database='test',
            autoconnect=True,
            **{
                'host': '127.0.0.1',
                'port': 3306,
                'charset': 'utf8',
                'sql_mode': 'PIPES_AS_CONCAT',
                'use_unicode': True,
                'user': 'root',
                'password': ''
            })


database = MySqlAutoReconnectDatabase()


class Entity(Model):
    info = CharField(null=True)

    class Meta:
        database = database
        table_name = 'entity'


def some_proc(id):
    while (1):
        entity = list(Entity.select().where(Entity.id == id))
        sleep(0.5)


entities = Entity.select()
for entity in entities:
    print(entity.info)
    process = Process(target=some_proc, args=(entity.id,))
    process.start()

The exception that I want to avoid:

Traceback (most recent call last):
  File "/usr/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/root/PycharmProjects/test/test.py", line 37, in some_proc
    entity = list(Entity.select().where(Entity.id == id))
  File "/usr/local/lib/python3.7/dist-packages/peewee.py", line 1978, in __len__
    self._ensure_execution()
  File "/usr/local/lib/python3.7/dist-packages/peewee.py", line 1957, in _ensure_execution
    if not self._cursor_wrapper:
  File "/usr/local/lib/python3.7/dist-packages/peewee.py", line 4223, in __len__
    self.fill_cache()
  File "/usr/local/lib/python3.7/dist-packages/peewee.py", line 4264, in fill_cache
    iterator.next()
  File "/usr/local/lib/python3.7/dist-packages/peewee.py", line 4320, in next
    self.cursor_wrapper.iterate()
  File "/usr/local/lib/python3.7/dist-packages/peewee.py", line 4233, in iterate
    self.cursor.close()
  File "/usr/lib/python3/dist-packages/MySQLdb/cursors.py", line 84, in close
    while self.nextset():
  File "/usr/lib/python3/dist-packages/MySQLdb/cursors.py", line 172, in nextset
    nr = db.next_result()
_mysql_exceptions.ProgrammingError: (2014, "Commands out of sync; you can't run this command now")

Database:

CREATE DATABASE IF NOT EXISTS `test` DEFAULT CHARACTER SET utf8 ;
USE `test` ;

CREATE TABLE IF NOT EXISTS `test`.`entity` (
  `id` INT NOT NULL AUTO_INCREMENT,
  `info` VARCHAR(2048) NULL,
  PRIMARY KEY (`id`))
ENGINE = InnoDB;

INSERT INTO `entity` (`id`, `info`) VALUES (NULL, 'test1');
INSERT INTO `entity` (`id`, `info`) VALUES (NULL, 'test2');
INSERT INTO `entity` (`id`, `info`) VALUES (NULL, 'test3');
INSERT INTO `entity` (`id`, `info`) VALUES (NULL, 'test4');
INSERT INTO `entity` (`id`, `info`) VALUES (NULL, 'test5');
INSERT INTO `entity` (`id`, `info`) VALUES (NULL, 'test6');

Solution

  • You are creating your db connection (with associated sockets / file-descriptors) before you're forking your new processes. Those get carried-over into the child processes and obviously this won't work.

    It is easy to resolve. Just wait to open your db connection after you fork new processes.