I want to run a file that listens to kafka in a parallel stream with a django project. My manage.py file
import asyncio
import os
import sys
import multiprocessing as mt
from kafka.run_kafka import run_kafka
def main():
"""Run administrative tasks."""
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'business_logic.settings')
try:
from django.core.management import execute_from_command_line
except ImportError as exc:
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
) from exc
execute_from_command_line(sys.argv)
if __name__ == '__main__':
kafka_process = mt.Process(target=asyncio.run(run_kafka()))
django_process = mt.Process(target=main())
kafka_process.start()
django_process.start()
kafka_process.join()
django_process.join()
My run_kafka.py file uses Confluent Kafka Python
import os
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'business_logic.settings')
django.setup()
import asyncio
from business_logic.settings import KAFKA_CONF, TOPIC_PROD
from kafka.kafka_consuners import KafkaConsumerBL
async def run_kafka():
"""
Запуск прослушивания Kafka на всех топиках на все ответы
"""
consumer = KafkaConsumerBL(KAFKA_CONF)
consumer.indicate_topic([TOPIC_PROD])
await consumer.run_consumer(consumer)
if __name__ == '__main__':
asyncio.run(run_kafka())
I tried to solve the problem using the threading and multiprocessing libraries. After using any of the libraries, either the django project or kafka is launched.
When using the multiprocessing library, one process is started, but not both manage.py
...
if __name__ == '__main__':
kafka_process = mt.Process(target=asyncio.run(run_kafka()))
django_process = mt.Process(target=main())
kafka_process.start()
django_process.start()
kafka_process.join()
django_process.join()
When using the threading library, only one process is started again manage.py
...
if __name__ == '__main__':
threading.Thread(target=asyncio.run(run_kafka())).start()
threading.Thread(target=main()).start()
Can you tell me where I made a mistake, did I use the library incorrectly, or do I need to use another method at all?
Solved the problem in the following way:
if __name__ == "__main__":
process = subprocess.Popen(['python3', 'kafka_run.py'], stdout=subprocess.PIPE)
uvicorn.run(app=application, host='0.0.0.0', port=8000)