Search code examples
pythonpostgresqlmultiprocessingthreadpoolpeewee

python peewee multiprocessing pool error


Stack: python3.4, PostgreSQL 9.4.7, peewee 2.8.0, psycopg2 2.6.1 (dt dec pq3 ext lo64)

I have a need to be able to talk(select, inserts, update) to the postgresql database in each worker. I am using pythons multiprocessing pool to create 10 workers and each one makes a curl call then talks to the database based on what it finds.

After reading a few threads on the internets I thought a connection pool was the way to go. So i placed the code below atop my models.py file. I have my doubts about connections pools because my understanding is that reusing database connections across threads is a no no.

db = PooledPostgresqlExtDatabase(
    'uc',
    max_connections=32,
    stale_timeout=300,  # 5 minutes.
    **{'password': cfg['psql']['pass'], 
       'port': cfg['psql']['port'], 
       'register_hstore':False,
       'host': cfg['psql']['host'], 
       'user': cfg['psql']['user']})

On to the question now. I am getting random sql errors when talking to the database from some workers. Before i introduced peewee into the mix i was using the "psycopg2" library without a wrapper. I was also creating a new database connection per worker. There were no errors.

A sample error that i get is:

multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/usr/local/lib/python3.4/dist-packages/playhouse/postgres_ext.py", line 377, in execute_sql
    self.commit()
  File "/usr/local/lib/python3.4/dist-packages/peewee.py", line 3468, in commit
    self.get_conn().commit()
psycopg2.DatabaseError: error with no message from the libpq

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.4/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.4/multiprocessing/pool.py", line 44, in mapstar
    return list(map(*args))
  File "/home/dan/dev/link-checker/crawler/manager.py", line 17, in startWorker
    wrk.perform()
  File "/home/dan/dev/link-checker/crawler/worker.py", line 49, in perform
    self.pullUrls()
  File "/home/dan/dev/link-checker/crawler/worker.py", line 63, in pullUrls
    newUrlDict = UrlManager.createUrlWithInProgress(self._url['crawl'], source_url, self._url['base'])
  File "/home/dan/dev/link-checker/crawler/models.py", line 152, in createUrlWithInProgress
    newUrl = Url.create(**newUrlDict)
  File "/usr/local/lib/python3.4/dist-packages/peewee.py", line 4494, in create
    inst.save(force_insert=True)
  File "/usr/local/lib/python3.4/dist-packages/peewee.py", line 4680, in save
    pk_from_cursor = self.insert(**field_dict).execute()
  File "/usr/local/lib/python3.4/dist-packages/peewee.py", line 3213, in execute
    cursor = self._execute()
  File "/usr/local/lib/python3.4/dist-packages/peewee.py", line 2628, in _execute
    return self.database.execute_sql(sql, params, self.require_commit)
  File "/usr/local/lib/python3.4/dist-packages/playhouse/postgres_ext.py", line 377, in execute_sql
    self.commit()
  File "/usr/local/lib/python3.4/dist-packages/peewee.py", line 3285, in __exit__
    reraise(new_type, new_type(*exc_args), traceback)
  File "/usr/local/lib/python3.4/dist-packages/peewee.py", line 127, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.4/dist-packages/playhouse/postgres_ext.py", line 377, in execute_sql
    self.commit()
  File "/usr/local/lib/python3.4/dist-packages/peewee.py", line 3468, in commit
    self.get_conn().commit()
peewee.DatabaseError: error with no message from the libpq

I also tailed the postgresql file and this is what i saw:

2016-04-19 20:34:23 EDT [26824-3] uc_user@uc WARNING:  there is already a transaction in progress
2016-04-19 20:34:23 EDT [26824-4] uc_user@uc WARNING:  there is already a transaction in progress
2016-04-19 20:34:23 EDT [26824-5] uc_user@uc WARNING:  there is no transaction in progress
2016-04-19 20:34:23 EDT [26824-6] uc_user@uc WARNING:  there is already a transaction in progress
2016-04-19 20:34:23 EDT [26824-7] uc_user@uc WARNING:  there is no transaction in progress
2016-04-19 20:34:23 EDT [26824-8] uc_user@uc WARNING:  there is already a transaction in progress
2016-04-19 20:34:23 EDT [26824-9] uc_user@uc WARNING:  there is already a transaction in progress
2016-04-19 20:35:14 EDT [26976-1] uc_user@uc WARNING:  there is already a transaction in progress
2016-04-19 20:35:14 EDT [26976-2] uc_user@uc WARNING:  there is no transaction in progress
2016-04-19 20:35:14 EDT [26976-3] uc_user@uc WARNING:  there is already a transaction in progress
2016-04-19 20:35:14 EDT [26976-4] uc_user@uc WARNING:  there is already a transaction in progress
2016-04-19 20:35:14 EDT [26976-5] uc_user@uc WARNING:  there is no transaction in progress
2016-04-19 20:35:14 EDT [26976-6] uc_user@uc WARNING:  there is already a transaction in progress
2016-04-19 20:35:14 EDT [26976-7] uc_user@uc WARNING:  there is no transaction in progress
2016-04-19 20:35:14 EDT [26976-8] uc_user@uc WARNING:  there is already a transaction in progress
2016-04-19 20:35:14 EDT [26976-9] uc_user@uc WARNING:  there is no transaction in progress

My hunch is that the connection pool and the multiprocessing don't go well together. Has anyone done this successfully without errors and if so, can you point me to an example or give me a piece of advice that works?

Do i need to explicitly create a new connection with peewee inside my worker or is there an easier way to use peewee with the multiprocessing pool library.

Thanks for your answers and for reading.


Solution

  • I got it working, all the code in the models.py file that was going to be used by the workers. I wrapped it in "with db.execution_context as ctx" as described on this page:

    http://docs.peewee-orm.com/en/latest/peewee/database.html#advanced-connection-management