Search code examples
pythonapache-kafkaconfluent-schema-registryksqldb

Kafka: Message does not start with magic byte


I tried to fetch my data from my processed topic using KSQL. However, it is not working.

I set up a table called api_table using KSQL. Here are the details of my table.

ksql> show topics;

 Kafka Topic   | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups 
-------------------------------------------------------------------------------------------
 _schemas      | false      | 1          | 1                  | 0         | 0              
 api_log       | true       | 1          | 1                  | 1         | 1              
 API_STREAM    | false      | 1          | 1                  | 0         | 0              
 API_STREAM_KEY| true       | 1          | 1                  | 1         | 1              
 API_TABLE     | true       | 1          | 1                  | 0         | 0              
 mysql-config  | false      | 1          | 1                  | 0         | 0              
 mysql-offsets | false      | 25         | 1                  | 0         | 0              
 mysql-status  | false      | 5          | 1                  | 0         | 0              
-------------------------------------------------------------------------------------------

And this my table format.

ksql> describe extended bv_table;

Name                 : API_TABLE
Type                 : TABLE
Key field            : 
Key format           : STRING
Timestamp field      : Not set - using <ROWTIME>
Value format         : AVRO
Kafka topic          : API_TABLE (partitions: 1, replication: 1)

 Field      | Type                      
----------------------------------------
 ROWTIME    | BIGINT           (system) 
 ROWKEY     | VARCHAR(STRING)  (system) 
 KSQL_COL_0 | BIGINT                    
 COUNT      | BIGINT                    
----------------------------------------

Queries that write into this TABLE
-----------------------------------
CTAS_API_TABLE_2 : CREATE TABLE API_TABLE WITH (REPLICAS = 1, PARTITIONS = 1, KAFKA_TOPIC = 'API_TABLE') AS SELECT
  WINDOWSTART() "KSQL_COL_0"
, COUNT(*) "COUNT"
FROM API_STREAM_KEY API_STREAM_KEY
WINDOW TUMBLING ( SIZE 5 MINUTES ) 
GROUP BY API_STREAM_KEY.METRIC;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
messages-per-sec:      0.10   total-messages:       249     last-message: 2019-08-13T07:07:39.325Z

(Statistics of the local KSQL server interaction with the Kafka topic API_TABLE)

Everything is working fine, and I even can print out the message.

However, if I try to use python to consume the message.

from confluent_kafka import KafkaError
import io
from confluent_kafka import Consumer
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError


consumer = AvroConsumer({
    'bootstrap.servers': 'localhost:9021',
    'schema.registry.url': 'http://localhost:8081',
    'group.id': 'abcd'
})

consumer.subscribe(['API_TABLE'])

while True:
    try:
        msg = consumer.poll(10)
    except SerializerError as e:
        print("Message deserialization failed for {}: {}".format(msg, e))
        break

    if msg is None:
        continue

    if msg.error():
        print("AvroConsumer error: {}".format(msg.error()))
        continue

    print(msg.value())

consumer.close()

It shows this error. Why????

Traceback (most recent call last):
  File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/__init__.py", line 149, in poll
    decoded_key = self._serializer.decode_message(message.key(), is_key=True)
  File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/serializer/message_serializer.py", line 225, in decode_message
    raise SerializerError("message does not start with magic byte")
confluent_kafka.avro.serializer.SerializerError: message does not start with magic byte

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/ylee/PycharmProjects/Bokeh/consumer.py", line 18, in <module>
    msg = consumer.poll(10)
  File "/Users/ylee/PycharmProjects/Bokeh/env/lib/python3.7/site-packages/confluent_kafka/avro/__init__.py", line 156, in poll
    e))
confluent_kafka.avro.serializer.SerializerError: Message deserialization failed for message at API_TABLE [0] offset 110: message does not start with magic byte

Solution

  • I tried to solve it myself. I modified a little bit to suit my needs. Here is a workaround.

    import io
    import struct
    
    from avro.io import BinaryDecoder, DatumReader
    from confluent_kafka import Consumer
    from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
    from confluent_kafka.avro.serializer import SerializerError
    
    # Please adjust your server and url
    
    # KAFKA BROKER URL
    consumer = Consumer({
        'bootstrap.servers': 'localhost:9021',
        'group.id': 'abcd'
    })
    
    # SCHEMA URL 
    register_client = CachedSchemaRegistryClient(url="http://localhost:8081")
    consumer.subscribe(['YOUR TOPIC'])
    
    MAGIC_BYTES = 0
    
    
    def unpack(payload):
        magic, schema_id = struct.unpack('>bi', payload[:5])
    
        # Get Schema registry
        # Avro value format
        if magic == MAGIC_BYTES:
            schema = register_client.get_by_id(schema_id)
            reader = DatumReader(schema)
            output = BinaryDecoder(io.BytesIO(payload[5:]))
            abc = reader.read(output)
            return abc
        # String key
        else:
            # If KSQL payload, exclude timestamp which is inside the key. 
            # payload[:-8].decode()
            return payload.decode()
    
    
    def get_data():
        while True:
            try:
                msg = consumer.poll(10)
            except SerializerError as e:
                print("Message deserialization failed for {}: {}".format(msg, e))
                raise SerializerError
    
            if msg.error():
                print("AvroConsumer error: {}".format(msg.error()))
                return
    
            key, value = unpack(msg.key()), unpack(msg.value())
            print(key, value)
    
    if __name__ == '__main__':
        get_data()
    

    For more detail why it happens, can read it on my blog