Search code examples
pythonsynchronizationavromarker

Can't read back tiny avro file created with python fastavro package


I have a tiny file I want to serialize It's a text file containing only that row :

INSERT INTO `pagelinks` VALUES (11442565,0,'Présent_de_narration',2600),(10265670,0,'Président',2600);

I use this little program to do it, using the fastavro 0.17.9 package (python 3.5.2 on Ubuntu 16.04 LTS) :

import sys, re
from fastavro import writer

schema = {
    "namespace": "com.projet4.pagelinks",
    "type": "record",
    "name": "pagelink",
    "fields": [
        {"name": "page_id", "type": "int"},
        {"name": "page_title", "type": "string"}
    ]
}

insert_regex = re.compile('''INSERT INTO `pagelinks` VALUES (.*)\;''')
row_regex = re.compile("""(.*),(.*),'(.*)',(.*)""")
for line in sys.stdin:
    avro_file = open("pagelinks.avro", 'wb')
    match = insert_regex.match(line.strip())
    if match is not None:
        data = match.groups(0)[0]
        rows = data[1:-1].split("),(")
        for row in rows:
            row_match = row_regex.match(row)
            if row_match is not None:
                # >>> row_match.groups()
                # (12,0,'Anti-statism',0)
                # # page_id, pl_namespace, pl_title, pl_from_namespace
                if row_match.groups()[1] == '0':
                      page_id, pl_title = row_match.groups()[0], row_match.groups()[2]
                      print(int(page_id), pl_title)
                      writer(avro_file, schema, [{"page_id":int(page_id), "page_title":pl_title}])

I launch the program with this command line :

cat pagelinks_nano.sql | ./parse_links_fastavro_test.py

It seems to work, the avro file is created, then I try to read it :

import fastavro
with open("pagelinks.avro", 'rb') as avro_file:
    reader = fastavro.reader(avro_file)
    print("Embedded Schema :\n\n",reader.schema,"\n\nLines :")
    for pagelink in reader:
        print(pagelink)

Here is the problem

The file is opened, schema appears, first line too But the program crash with the message :

Embedded Schema :

 {'name': 'pagelink', 'type': 'record', 'namespace': 'com.projet4.pagelinks', 'fields': [{'name': 'page_id', 'type': 'int'}, {'name': 'page_title', 'type': 'string'}]} 

Lines :
{'page_id': 11442565, 'page_title': 'Présent_de_narration'}
Traceback (most recent call last):
  File "./reading.py", line 5, in <module>
    for pagelink in reader:
  File "fastavro/_read.pyx", line 645, in _iter_avro
  File "fastavro/_read.pyx", line 548, in fastavro._read.skip_sync
ValueError: expected sync marker not found

Is it a fastavro or coding issue?

Any help would be appreciated :o

Thanks anyway


Solution

  • I found the solution browsing the fastavro official github

    https://github.com/tebeka/fastavro/issues/12

    "Currently fastavro supports only "one shot" writes. However records can be any iterable, including a generator the creates records one by one. I'll look into appending."

    Buffering the values and write only at the end of the loop did it

    insert_regex = re.compile('''INSERT INTO `pagelinks` VALUES (.*)\;''')
    row_regex = re.compile("""(.*),(.*),'(.*)',(.*)""")
    avro_content = []
    for line in sys.stdin:
        avro_file = open("pagelinks.avro", 'wb')
        match = insert_regex.match(line.strip())
        if match is not None:
            data = match.groups(0)[0]
            rows = data[1:-1].split("),(")
            for row in rows:
                row_match = row_regex.match(row)
                if row_match is not None:
                    # >>> row_match.groups()
                    # (12,0,'Anti-statism',0)
                    # # page_id, pl_namespace, pl_title, pl_from_namespace
                    if row_match.groups()[1] == '0':
                          page_id, pl_title = row_match.groups()[0], row_match.groups()[2]
                          avro_content.append({"page_id":int(page_id), "page_title":pl_title})
    writer(avro_file, schema, avro_content)