Search code examples
pythonapache-kafkakafka-python

Python script is running in the IDE, but not in the terminal (Kafka)


This may or may not be Kafka related, but I encountered this while learning Kafka. I've got a python producer script that looks like this:

from kafka import KafkaProducer
from json import dumps


class Producer:

    def __init__(self):
        self.connection = KafkaProducer(
            bootstrap_servers=['localhost:9092'],
            value_serializer=lambda x: dumps(x).encode('utf-8')
        )

    def push_client(self, data):
        self.connection.send('client-pusher', value=data)


data = {
    "first_name": "Davey",
    "email": "[email protected]",
    "group_id": 3,
    "date": "2021-12-12"
}

producer = Producer()
producer.push_client(data)

I'm running the Kafka Broker in Docker, and the messages get consumed on the other end by this script:

import json
from datetime import date
from typing import Optional

from kafka import KafkaConsumer
from pydantic import BaseModel


class Client(BaseModel):
    first_name: str
    email: str
    group_id: Optional[int] = None
    date: date


consumer = KafkaConsumer(
    'client-pusher',
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    group_id='my-group-id',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

while True:
    msg_pack = consumer.poll(timeout_ms=500)

    for tp, messages in msg_pack.items():
        for message in messages:
            client = Client(**message.value)
            print(client)

The consumer script listens for new messages on an infinite loop. I can run the consumer in terminal or in vscode and it will always print out the data dict from the producer, but ONLY if I run the producer script in Visual Studio code.

If I run the producer script in the terminal with

python producer.py

the messages don't come through to the consumer. There are no runtime errors (print statements in the producer come through fine). I cannot for the life of me see what's different about the environment in my IDE.

I have different virtual environments governing both scripts. I've tried running the producer with the full path to the venv, copied straight from vscode's terminal, for example

/home/me/whatever/dummy-producer/.venv/bin/python producer.py

I've also printed out everything in sys.path – they're identical between the IDE and the terminal.

What else might I try to find the difference between vscode's execution and the terminal's? I'm using zsh, if that matters.


Solution

  • Kafka clients don't immediately send the messages; if you have less than the default batch size and the app exits, you're effectively dropping events.

    If you want to send immediately, you need one more method in the producer

    def push_client(self, data):
        self.connection.send('client-pusher', value=data)
        self.connection.flush()