Search code examples
dockerapache-kafkafastapiconsumerkafka-python

Kafka + FastAPI + Docker template


Introduction

I am currently experimenting with Kafka and FastAPI and trying to build a template to enable me to quickly write programs in a microservice pattern.

Goal - Vision

Building a repository of design patterns that implement very easy microservice infrastructures. The examples should only demonstrate how messages are sent between different services and offer a user to easily integrate their custom code without the hassle of spending a lot of time with the setup.

Motivation

I searched a lot but I was not able to find simple examples. Most examples are highly customized and do not really generalize.

Tech Stack

  • Kafka
  • FastApi
  • Docker

Open to other implementations

Please let me know if you have any other recommendations. I am quite new to microservice architectures and would be very happy to explore further designs.

Current Problem

My current template involves building a Zookeeper, Kafka, consumer, and producer service. However, I am encountering an issue where my consumer is not able to consume messages generated by my producer. The producer seems to work fine and successfully publishes messages, which I have confirmed using the docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my-topic --from-beginning command.

My consumer appears to not do anything at all.

Thank you in advance for all your suggestions on this issue.

my folder structure:

enter image description here

my docker-compose file:

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888

  kafka:
    image: confluentinc/cp-kafka:latest
    restart: "no"
    links:
      - zookeeper
    ports:
      - 9092:9092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL


  producer:
    build: ./producer
    ports:
      - '8000:8000'
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
    depends_on:
      - kafka

  consumer:
    build: ./consumer
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
      - KAFKA_GROUP_ID=my-group
    depends_on:
      - kafka

  kafdrop:
    image: obsidiandynamics/kafdrop
    restart: "no"
    environment:
      - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
    ports:
      - 9000:9000
    depends_on:
      - kafka

my producer docker file:

FROM python:3.8-slim-buster


COPY . /app
WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

my producer req file:

fastapi
uvicorn
confluent-kafka

my producer main.py:

import json
from confluent_kafka import Producer
from fastapi import FastAPI


app = FastAPI()

producer_conf = {
    'bootstrap.servers': 'kafka:9092',
    'client.id': 'my-app'
}

producer = Producer(producer_conf)

def produce(data: dict):
    try:
        data = json.dumps(data).encode('utf-8')
        producer.produce('my-topic', value=data)
        producer.flush()
        return {"status": "success", "message": data}
    except Exception as e:
        return {"status": "error", "message": str(e)}

my consumer docker file:

FROM python:3.8-slim-buster

COPY . /app
WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt



CMD [ "python", "main.py" ]

my consumer req file:

confluent-kafka

my consumer main.py:

from confluent_kafka import Consumer, KafkaError

conf = {
    'bootstrap.servers': 'kafka:9092',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': True,
    'group.id': 'my-group',
    'api.version.request': True,
    'api.version.fallback.ms': 0
}

def consume_messages():
    consumer = Consumer(conf)

    consumer.subscribe(['my-topic'])

    try:
        while True:
            msg = consumer.poll(1.0)

            if msg is None:
                continue

            if msg.error():
                if msg.error().code() == KafkaError._PARTITION_EOF:
                    print(f'Reached end of partition: {msg.topic()}[{msg.partition()}]')
                else:
                    print(f'Error while consuming messages: {msg.error()}')
            else:
                print(f"Received message: {msg.value().decode('utf-8')}")

    except Exception as e:
        print(f"Exception occurred while consuming messages: {e}")
    finally:
        consumer.close()

def startup():
    consume_messages()

if __name__ == "__main__":
    try:
        print("Starting consumer...")
        startup()
    except Exception as e:
        print(f"Exception occurred: {e}")

Build system via:

docker-compose up

You can activate the producer with this curl:

 curl -X POST http://localhost:8000/produce -H "Content-Type: application/json" -d '{"key": "nice nice nice"}'

I tried to re-write the consumer multiple times. Changed ports and docker compose configurations. Unfortunatly, I am unable to pin-point my issue.


Solution

  • Special thanks to @OneCricketeer for helping me get this up and running!

    Repository for micro service templates

    (contains this solution)

    Please feel free to contribute here:

    https://github.com/maxmekiska/micro-templates

    Tech Stack

    • Kafka
    • FastApi
    • Docker

    Open to other implementations

    Please let me know if you have any other recommendations. I am quite new to microservice architectures and would be very happy to explore further designs.

    my folder structure:

    enter image description here

    my docker-compose file:

    version: '3'
    
    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:latest
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
        ports:
          - 2181:2181
          - 2888:2888
          - 3888:3888
    
      kafka:
        image: confluentinc/cp-kafka:latest
        restart: "no"
        links:
          - zookeeper
        ports:
          - 9092:9092
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_LISTENERS: INTERNAL://:29092,EXTERNAL://:9092
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:29092,EXTERNAL://localhost:9092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
    
      kafdrop:
        image: obsidiandynamics/kafdrop
        restart: "no"
        environment:
          KAFKA_BROKERCONNECT: "kafka:29092"
        ports:
          - 9000:9000
        depends_on:
          - kafka
    
      producer:
        build: ./producer
        ports:
          - '8000:8000'
        environment:
          - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
        depends_on:
          - kafka
    
      consumer:
        build: ./consumer
        environment:
          - KAFKA_BOOTSTRAP_SERVERS=kafka:29092
          - KAFKA_GROUP_ID=my-group
        depends_on:
          - kafka
    

    my producer docker file:

    FROM python:3.8-slim-buster
    
    
    COPY . /app
    WORKDIR /app
    
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    
    CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
    

    my producer req file:

    fastapi
    uvicorn
    confluent-kafka
    

    my producer main.py:

    import json
    from confluent_kafka import Producer
    from fastapi import FastAPI
    
    
    app = FastAPI()
    
    producer_conf = {
        'bootstrap.servers': 'kafka:29092',
        'client.id': 'my-app'
    }
    
    producer = Producer(producer_conf)
    
    @app.post("/produce")
    def produce(data: dict):
        producer.produce('my-topic', value=json.dumps(data).encode('utf-8'))
        producer.flush()
        return {"status": "success"}
    

    my consumer docker file:

    FROM python:3.8-slim-buster
    
    COPY . /app
    WORKDIR /app
    
    COPY requirements.txt .
    RUN pip install --no-cache-dir -r requirements.txt
    
    
    
    CMD [ "python", "main.py" ]
    

    my consumer req file:

    confluent-kafka
    

    my consumer main.py:

    from confluent_kafka import Consumer, KafkaError
    import time 
    
    import logging
    logging.basicConfig(level=logging.DEBUG)
    
    
    conf = {
        'bootstrap.servers': 'kafka:29092',
        'auto.offset.reset': 'earliest',
        'enable.auto.commit': True,
        'group.id': 'my-group',
        'api.version.request': True,
        'api.version.fallback.ms': 0
    }
    
    def consume_messages():
        consumer = Consumer(conf)
    
        consumer.subscribe(['my-topic'])
    
        try:
            while True:
                msg = consumer.poll(1.0)
                logging.info("Polling")
                logging.info(msg)
    
                if msg is None:
                    logging.info("No message")
                    continue
    
                if msg.error():
                    logging.info("Error")
                    if msg.error().code() == KafkaError._PARTITION_EOF:
                        print(f'Reached end of partition: {msg.topic()}[{msg.partition()}]')
                    else:
                        print(f'Error while consuming messages: {msg.error()}')
                        logging.info(msg.error())
                else:
                    print(f"Received message: {msg.value().decode('utf-8')}")
                    logging.info(msg.value().decode('utf-8'))
    
        except Exception as e:
            print(f"Exception occurred while consuming messages: {e}")
            logging.info(e)
        finally:
            consumer.close()
            logging.info("Consumer closed")
    
    
    def startup():
        logging.info("Starting consumer...")
        time.sleep(30)
        consume_messages()
    
    if __name__ == "__main__":
        try:
            startup()
        except Exception as e:
            print(f"Exception occurred: {e}")
    

    Build system via:

    docker-compose up
    

    You can activate the producer with this curl:

     curl -X POST http://localhost:8000/produce -H "Content-Type: application/json" -d '{"key": "nice nice nice"}'