I am running the test with log_cli=true. The script:
import logging
import sys
from multiprocessing import Process
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
logger = logging.getLogger("leapp.actors.quagga_report")
class ActorContext:
def __init__(self):
self.log = logger
def run(self):
self.log.debug("Some msg")
current_actor_context = ActorContext()
def test_caplog_fails(caplog):
with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
p = Process(target=current_actor_context.run)
p.start()
p.join()
assert "Some msg" in caplog.text
def test_caplog_passes(caplog):
with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
current_actor_context.run()
assert "Some msg" in caplog.text
pytest log_cli shows the log message in both tests, however, the caplog sees the messages only for the second test.
First test fails with the following traceback:
-------------------------------- live log call ---------------------------------
| 13:39:20 | 40212 | leapp.actors.quagga_report | DEBUG | test_logger_caplog_fails.py | Some msg
FAILED
tests/test_logger_caplog_fails.py:20 (test_caplog_fails)
Traceback (most recent call last):
File "/home/azhukov/Dropbox/code/lighting_talks/asyncio_subprocess_shells/tests/test_logger_caplog_fails.py", line 26, in test_caplog_fails
assert "Some msg" in caplog.text
AssertionError: assert 'Some msg' in ''
+ where '' = <_pytest.logging.LogCaptureFixture object at 0x7fb8a87f2370>.text
I was looking to a similar question Pytest capture not working - caplog and capsys are empty , however in my case the property propagate=True
Inspired by @hoefling and still the will to use caplog, there is a solution. The idea is to create a fixture, which takes the queue from the QueueHandler handler and reemits the logs in the main process, which is capturable by the caplog
import logging
import sys
from contextlib import contextmanager
from logging import handlers
from multiprocessing import Process, Queue
import pytest
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
logger = logging.getLogger(__name__)
class ActorContext:
def __init__(self):
self.log = logger
def run(self):
self.log.debug("Some msg")
current_actor_context = ActorContext()
@pytest.fixture()
def caplog_workaround():
@contextmanager
def ctx():
logger_queue = Queue()
logger = logging.getLogger()
logger.addHandler(handlers.QueueHandler(logger_queue))
yield
while not logger_queue.empty():
log_record: logging.LogRecord = logger_queue.get()
logger._log(
level=log_record.levelno,
msg=log_record.message,
args=log_record.args,
exc_info=log_record.exc_info,
)
return ctx
def test_caplog_already_not_fails(caplog, caplog_workaround):
with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
with caplog_workaround():
p = Process(target=current_actor_context.run)
p.start()
p.join()
assert "Some msg" in caplog.text
def test_caplog_passes(caplog, capsys):
with caplog.at_level(logging.DEBUG, logger="leapp.actors.quagga_report"):
current_actor_context.run()
assert "Some msg" in caplog.text