I am trying to write a Kafka producer based on Avro schema using confluent Kafka python library
But I am getting below error
Traceback (most recent call last):
File "fastavro\\_write.pyx", line 363, in fastavro._write.write_record
**TypeError: Expected dict, got Address**
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\deepa\KafkaFastApi\kafka\producer_avro.py", line 102, in <module>
value=avro_serializer(user, SerializationContext(topic, MessageField.VALUE)),
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "C:\Users\deepa\KafkaFastApi\venv\Lib\site-packages\confluent_kafka\schema_registry\avro.py", line 304, in __call__
schemaless_writer(fo, self._parsed_schema, value)
File "fastavro\\_write.pyx", line 794, in fastavro._write.schemaless_writer
File "fastavro\\_write.pyx", line 458, in fastavro._write.write_data
File "fastavro\\_write.pyx", line 402, in fastavro._write.write_record
File "fastavro\\_write.pyx", line 458, in fastavro._write.write_data
File "fastavro\\_write.pyx", line 378, in fastavro._write.write_record
**ValueError: no value and no default for address_line1**
Below are my schema and Kafka producer
{
"namespace": "example.avro",
"type": "record",
"name": "Person",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "first_name",
"type": "string"
},
{
"name": "middle_name",
"type": [
"string",
"null"
]
},
{
"name": "last_name",
"type": "string"
},
{
"name": "dob",
"type": "string"
},
{
"name": "address",
"type": {
"type": "record",
"name": "Address",
"fields": [
{
"name": "address_line1",
"type": "string"
},
{
"name": "address_line2",
"type": "string"
},
{
"name": "city",
"type": "string"
},
{
"name": "state",
"type": "string"
},
{
"name": "zip",
"type": "int"
},
{
"name": "address_type",
"type": {
"type": "enum",
"name": "event_type_enums",
"symbols": ["Mailing", "Residential"]
}
}
]
}
}
]
}
kakfaProducer.py
import datetime import time import uuid
import os import fastavro import confluent_kafka.serialization from confluent_kafka import Producer from pathlib import Path import json
from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroSerializer from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField from models.person import Person, Address, AddressType
# kafka-console-consumer.bat --bootstrap-server http://localhost:9094 --topic person --property schema.registry.url=http://localhost:8081 --property print.key=true --property print.value=true --key-deserializer org.apache.kafka.common.serialization.StringDeserializer
# --value-deserializer io.confluent.kafka.serializers.KafkaAvroDeserializer schema_path = Path("C:\\Users\\deepa\\KafkaFastApi\\schema\\person.avsc")
conf = {'bootstrap.servers': 'localhost:9092'
}
producer = Producer(conf)
path = os.path.realpath(os.path.dirname(__file__)) print(path) with open(f"{path}\\schema\\person.avsc") as f:
schema_str = f.read()
print(schema_str)
schema_registry_conf = {'url': "http://localhost:8081"} schema_registry_client = SchemaRegistryClient(schema_registry_conf)
string_serializer = StringSerializer('utf_8')
def acked(err, msg):
if err is not None:
print("Failed to deliver message: %s: %s" % (str(msg), str(err)))
else:
print("Message produced: %s" % (str(msg)))
# Serve on_delivery callbacks from previous calls to produce()
producer.poll(0.0)
id: str = "test123" first_name: str = 'deepak' last_name: str = 'sharma' dob = datetime.date.today().isoformat()
address = Address(
address_line1="Address Line 1",
address_line2="address line 2",
city="Menands",
state="NY",
zip=12345,
address_type=AddressType.mailing
)
user = Person(
id=id,
first_name=first_name,
last_name=last_name,
dob=dob,
address=address
)
def user_to_dict(user: Person, ctx):
"""
Returns a dict representation of a User instance for serialization.
Args:
user (User): User instance.
ctx (SerializationContext): Metadata pertaining to the serialization
operation.
Returns:
dict: Dict populated with user attributes to be serialized.
"""
# User._address must not be serialized; omit from dict
return dict(id=user.id, first_name=user.first_name, last_name=user.last_name, dob=user.dob
, address=user.address
)
avro_serializer = AvroSerializer(schema_registry_client,
schema_str,
user_to_dict)
topic: str = "person" producer.produce(topic=topic,
key=string_serializer(id),
value=avro_serializer(user, SerializationContext(topic, MessageField.VALUE)),
on_delivery=acked)
print("\nFlushing records...") producer.flush()
The person and address are pydantic models
class Address(BaseModel):
address_line1: str
address_line2: Optional[str] = None
city: str
state: str
zip: int
address_type: AddressType
class Person(BaseModel):
id: str
first_name: str
middle_name: Optional[str] = None
last_name: str
dob: str
address: Address
I am not able to figure it out why the error is coming since I am providing the value for address_line1 and what exactly does it means "TypeError: Expected dict, got Address".
I am not able to move forward so could someone please help me in identifying what I am doing wrong. Thank you
You have a user_to_dict()
function, but no address_to_dict()
function. So you need to implement that and then modify your user_to_dict()
so that it looks something like this:
def user_to_dict():
return dict(
id=user.id,
first_name=user.first_name,
last_name=user.last_name,
dob=user.dob,
address=address_to_dict(user.address),
)