Search code examples
pythondjangopytestpython-multithreadingpytest-django

Run a django command for a set duration inside of a test


Update 4/21/23

Below is the working test thanks to Paul Gilmartin's answer (creator of django-pgpubsub). I renamed my Notification model in the import to EmailNotification to reduce confusion as it's the same name as the Notification model in django-pgpubsub:

@pytest.mark.django_db(transaction=True)  # needed to work
def test_shipment_status_notificiations(testuser, pg_connection):
    user = testuser
    # should already be made by our signal
    notification_settings = NotificationSettings.objects.get(user=user)
    notification_settings.failed_delivery_attempt = True
    notification_settings.available_for_pickup = True
    notification_settings.delivery_issue = True
    notification_settings.save()
    cust_email = "[email protected]"
    customer = Customer.objects.create(email=cust_email)
    order = Order.objects.create(customer=customer)
    shipment = Shipment.objects.create(user=user, order=order)
    pg_connection.poll()
    shipment.status = Shipment.FAILED_ATTEMPT
    shipment.save()
    shipment.status = Shipment.HELD
    shipment.save()
    shipment.status = Shipment.ISSUE
    shipment.save()
    assert 3 == len(pg_connection.notifies)

    # process_notifications() is needed for the listener to run and EmailNotifications to be created
    process_notifications(pg_connection)
    print(f"all emailnotifications: {EmailNotification.objects.all()}")
    existing_notifications = EmailNotification.objects.filter(
        shipment=shipment,
        email=cust_email,
    )
    assert existing_notifications.count() == 3

Original Post Sections Below (now outdated)

Problem

I have a test that runs a django-admin command in a thread with a timeout. The test passes, but it hangs and I must manually stop its execution. I'm not familiar with threading, but I think the thread keeps running, which causes the test to "hang" after it passes. I'm looking for a solution that runs the command since this is more of an integration test and not a unit test with mocks. Any solution that runs the command for a set duration should work as long as there's proper test teardown and it doesn't cause issues for other tests during CI.

Background

Django-pgpubsub listens for PostgresSQL triggers by running python manage.py listen. I'm using this as a lightweight alternative to Django signals and celery for sending emails when a Shipment model instance's status field changes to specific values.

The Test

# transaction=True is needed listen command to work for django-pgpubsub
@pytest.mark.django_db(transaction=True)
def test_shipment_status_notifications_with_listen_command(testuser):
    user = testuser
    notification_settings = NotificationSettings.objects.get(user=user)
    notification_settings.failed_attempt = True
    notification_settings.save()
    cust_email = "[email protected]"
    customer = Customer.objects.create(email=cust_email)
    order = Order.objects.create(customer=customer)

    def run_listen_command():
        call_command("listen")

    listen_thread = threading.Thread(target=run_listen_command, daemon=True)
    listen_thread.start()
    # Change Shipment status to trigger the notification
    shipment = Shipment.objects.create(user=user, order=order)
    shipment.status = Shipment.FAILED_ATTEMPT
    shipment.save()
    sleep(1)  # required to allow listen command to process the notification
    # Ensure the function that sends the notification is called and the notification is created
    existing_notifications = Notification.objects.filter(
        type=Notification.Type.FAILED_DELIVERY_ATTEMPT,
        shipment=shipment,
        email=cust_email,
    )
    assert existing_notifications.count() == 1
    # Wait for the listen_thread to complete, with a timeout
    # todo: this doesn't end the test and it hangs
    listen_thread.join(timeout=3)

Other Attempts

I'm unable to customize the Django command because it's from an external library, django-pgpubsub. I'm unsure if threading is the best solution since I found this suggestion using GPT-4 and was unable to find existing examples. Another attempt involved a ThreadPoolExecutor with a try/except/finally with a TimeoutError. This also passed but hung. Another involved using the subprocess module. This version fails since no notifications are sent, or it hangs if subprocess.PIPE is passed to the stdout and stderr parameters of subprocess.Popen() for troubleshooting. Here's the code for this version of the test:

@pytest.mark.django_db(transaction=True)
def test_shipment_status_notifications_with_listen_command_subprocess(testuser):
    print("starting test")
    user = testuser
    notification_settings = NotificationSettings.objects.get(user=user)
    notification_settings.failed_attempt = True
    notification_settings.save()
    cust_email = "[email protected]"
    customer = Customer.objects.create(email=cust_email)
    order = Order.objects.create(customer=customer)

    print(f"settings.ROOT_DIR: {settings.ROOT_DIR}")
    listen_command = [
        "python",
        str(settings.ROOT_DIR / "manage.py"),
        "listen",
    ]
    print("about to call listen command: ", listen_command)

    # todo: this version doesn't hang the test, but no notifications are sent
    # listen_process = subprocess.Popen(listen_command)
    # todo: using this version with stdout and stderr hangs the test
    listen_process = subprocess.Popen(listen_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, universal_newlines=True)
    # Change Shipment status to trigger the notification
    shipment = Shipment.objects.create(user=user, order=order)
    shipment.status = Shipment.FAILED_ATTEMPT
    shipment.save()

    time.sleep(5)

    # Stop the listen command
    listen_process.send_signal(signal.SIGTERM)
    listen_process.wait()

    stdout, stderr = listen_process.communicate()

    print("Output from listen command:")
    print(stdout)
    print("Error output from listen command:")
    print(stderr)

    existing_notifications = Notification.objects.filter(
        type=Notification.Type.FAILED_DELIVERY_ATTEMPT,
        shipment=shipment,
        email=cust_email,
    )
    # todo: No existing notifications found when using listen_process = subprocess.Popen(listen_command)
    print(f"existing_notifications: {existing_notifications}")
    assert existing_notifications.count() == 1

Solution

  • I'm the author of django-pgpubsub. Great to see your interest in the library!

    This isn't strictly an answer, but I don't have enough karma to leave a comment. I don't 100% understand what you want to do with the test, but it seems like you want to simulate what happens when the listen process is running in order to consume Notification objects?

    You don't necessarily need to use the listen management command here, nor run it in a separate process to do this. Instead, you can use the pgpubsub.listen.listen function directly in the same thread as the test and then use the poll function on the db connection. https://github.com/Opus10/django-pgpubsub/blob/master/pgpubsub/tests/test_core.py has a few examples of tests doing this, in particular

    @pytest.mark.django_db(transaction=True)
    def test_recover_notifications(pg_connection):
        Author.objects.create(name='Billy')
        Author.objects.create(name='Billy2')
        pg_connection.poll()
        assert 2 == len(pg_connection.notifies)
        assert 2 == Notification.objects.count()
        assert 0 == Post.objects.count()
        # Simulate when the listener fails to
        # receive notifications
        pg_connection.notifies = []
        pg_connection.poll()
        assert 0 == len(pg_connection.notifies)
        listen(recover=True, poll_count=1)
        pg_connection.poll()
        assert 0 == Notification.objects.count()
        assert 2 == Post.objects.count()
    

    You probably don't need to pass in recover=False for your case. There are also tests in that module using process_notifications, which is a function used internally in listen.

    Maybe I should add some docs on unit testing!