Search code examples
pythonapache-kafkaavroconfluent-schema-registryconfluent-kafka-python

ValueError: no value and no default in Kafka producer with Avro schema


I am trying to write a Kafka producer based on Avro schema using confluent Kafka python library

But I am getting below error

Traceback (most recent call last):
  File "fastavro\\_write.pyx", line 363, in fastavro._write.write_record
**TypeError: Expected dict, got Address**

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\deepa\KafkaFastApi\kafka\producer_avro.py", line 102, in <module>
    value=avro_serializer(user, SerializationContext(topic, MessageField.VALUE)),
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\deepa\KafkaFastApi\venv\Lib\site-packages\confluent_kafka\schema_registry\avro.py", line 304, in __call__
    schemaless_writer(fo, self._parsed_schema, value)
  File "fastavro\\_write.pyx", line 794, in fastavro._write.schemaless_writer
  File "fastavro\\_write.pyx", line 458, in fastavro._write.write_data
  File "fastavro\\_write.pyx", line 402, in fastavro._write.write_record
  File "fastavro\\_write.pyx", line 458, in fastavro._write.write_data
  File "fastavro\\_write.pyx", line 378, in fastavro._write.write_record
**ValueError: no value and no default for address_line1**

Below are my schema and Kafka producer

{
  "namespace": "example.avro",
  "type": "record",
  "name": "Person",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "first_name",
      "type": "string"
    },
    {
      "name": "middle_name",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "last_name",
      "type": "string"
    },
    {
      "name": "dob",
      "type": "string"
    },
    {
      "name": "address",
      "type": {
        "type": "record",
          "name": "Address",
          "fields": [
            {
              "name": "address_line1",
              "type": "string"
            },
            {
              "name": "address_line2",
              "type": "string"
            },
            {
              "name": "city",
              "type": "string"
            },
            {
              "name": "state",
              "type": "string"
            },
            {
              "name": "zip",
              "type": "int"
            },
            {
              "name": "address_type",
              "type": {
                "type": "enum",
                "name": "event_type_enums",
                "symbols": ["Mailing", "Residential"]
            }
            }
          ]
        }
      }

  ]
}

kakfaProducer.py

import datetime import time import uuid

import os import fastavro import confluent_kafka.serialization from confluent_kafka import Producer from pathlib import Path import json

from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroSerializer from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField from models.person import Person, Address, AddressType

# kafka-console-consumer.bat --bootstrap-server http://localhost:9094 --topic person --property schema.registry.url=http://localhost:8081 --property print.key=true  --property print.value=true --key-deserializer org.apache.kafka.common.serialization.StringDeserializer
# --value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer schema_path = Path("C:\\Users\\deepa\\KafkaFastApi\\schema\\person.avsc")

conf = {'bootstrap.servers': 'localhost:9092'
        }

producer = Producer(conf)

path = os.path.realpath(os.path.dirname(__file__)) print(path) with open(f"{path}\\schema\\person.avsc") as f:
    schema_str = f.read()

print(schema_str)

schema_registry_conf = {'url': "http://localhost:8081"} schema_registry_client = SchemaRegistryClient(schema_registry_conf)

string_serializer = StringSerializer('utf_8')


def acked(err, msg):
    if err is not None:
        print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
    else:
        print("Message produced: %s" % (str(msg)))

    # Serve on_delivery callbacks from previous calls to produce()


producer.poll(0.0)

id: str = "test123" first_name: str = 'deepak' last_name: str = 'sharma' dob = datetime.date.today().isoformat()

address = Address(
    address_line1="Address Line 1",
    address_line2="address line 2",
    city="Menands",
    state="NY",
    zip=12345,
    address_type=AddressType.mailing

)

user = Person(
    id=id,
    first_name=first_name,
    last_name=last_name,
    dob=dob,
    address=address

)


def user_to_dict(user: Person, ctx):
    """
    Returns a dict representation of a User instance for serialization.

    Args:
        user (User): User instance.

        ctx (SerializationContext): Metadata pertaining to the serialization
            operation.

    Returns:
        dict: Dict populated with user attributes to be serialized.
    """

    # User._address must not be serialized; omit from dict
    return dict(id=user.id, first_name=user.first_name, last_name=user.last_name, dob=user.dob
                , address=user.address
                )


avro_serializer = AvroSerializer(schema_registry_client,
                                 schema_str,
                                 user_to_dict)

topic: str = "person" producer.produce(topic=topic,
                 key=string_serializer(id),
                 value=avro_serializer(user, SerializationContext(topic, MessageField.VALUE)),
                 on_delivery=acked)

print("\nFlushing records...") producer.flush()

The person and address are pydantic models

class Address(BaseModel):
    address_line1: str
    address_line2: Optional[str] = None
    city: str
    state: str
    zip: int
    address_type: AddressType




class Person(BaseModel):
    id: str
    first_name: str
    middle_name: Optional[str] = None
    last_name: str
    dob: str
    address: Address

I am not able to figure it out why the error is coming since I am providing the value for address_line1 and what exactly does it means "TypeError: Expected dict, got Address".

I am not able to move forward so could someone please help me in identifying what I am doing wrong. Thank you


Solution

  • You have a user_to_dict() function, but no address_to_dict() function. So you need to implement that and then modify your user_to_dict() so that it looks something like this:

    def user_to_dict():
        return dict(
            id=user.id,
            first_name=user.first_name,
            last_name=user.last_name,
            dob=user.dob,
            address=address_to_dict(user.address),
        )