Search code examples
pythonjsonrabbitmqpika

RabbitMQ Pika jumbled message body


I am trying to create a simple file download and upload service with RabbitMQ in Python. I made Server and Client scripts and defined some objects that serialize and deserialize into and from json for the requests, so I could send a file uid or other parameters alongside the binary data (encoded with base64). Everything works when sending small files, but when I start sending bigger things (tested with a 1.4mb text file), I start getting JSONDecode errors. I am dumping the message sent and received to files, and it looks like the sent message is a correctly formatted json. However, the message received seems to be jumbled, for example:

Sending: {"uid":"test",Rhc2Rhc2Rhc2Rhc2Rhc2Rhc2=="}

Received: Rhc2Rhc2{"uid":"test",Rhc2Rhc2Rhc2Rhc2=="}

Obviously the jumbled blocks are much bigger than that. Has anyone run into this problem before?

Here are some code segments, I can upload the full code but it is not very tidy:

Upload Request:

class FileSystemUploadRequest:

def __init__(self, uid, data):
    self.uid = uid
    self.data = data

def encode(self):
    dicc = {"uid": self.uid, "data": base64.b64encode(self.data).decode()}
    return json.dumps(dicc)

@staticmethod
def decode(jsonstr):
    dicc = json.loads(jsonstr)
    uid = dicc["uid"]
    data = base64.b64decode(dicc["data"])
    return FileSystemUploadRequest(uid, data)

Client (sender):

def put_file_blocking(self, uid, data):

    print(" [x] Llamaron a put_file_blocking")

    corr_id = str(uuid.uuid4())
    request = FileSystemUploadRequest(uid, data)

    f = open("dump", "w")
    f.write(request.encode())
    f.close()

    # Send upload request
    self.channel.basic_publish(exchange='',
                               routing_key=self.queue_upload,
                               properties=pika.BasicProperties(
                                   reply_to=self.callback_queue_name,
                                   correlation_id=corr_id
                               ),
                               body=request.encode())

Server (receiver):

def upload_request(ch, method, props, body):

f = open("_dump", "w")
f.write(body.decode())
f.close()

# Get Upload Request
request = FileSystemUploadRequest.decode(body)
print(" [x] Received upload request for: " + request.uid)
filename = fs_dir + "/" + request.uid

# Do upload
f = open(filename, "wb")
f.write(request.data)
f.close()

# Create Upload Response
status = Status.OK
response = FileSystemUploadResponse(status)

# Send response
ch.basic_publish(exchange='',
                 routing_key=props.reply_to,
                 properties=pika.BasicProperties(correlation_id=props.correlation_id),
                 body=response.encode())
ch.basic_ack(delivery_tag=method.delivery_tag)

print(" [x] Finished upload request for: " + request.uid)

Solution

  • RabbitMQ is a message broker and not a file storage system. As the assumption is messages are somewhat limited in size, several optimization are in place which will work against your use case.

    To share files in a distributed system you should rely on an object storage of sort such as OpenStack Swift or AWS S3.

    You can still use RabbitMQ to notify the presence of a new file but rather than embedding it into the message, you provide the location in your object storage and the consumer retrieves the file from there.

    A couple of reference links:

    RabbitMQ best practices

    Can RabbitMQ handle big messages?