I am trying to create a simple file download and upload service with RabbitMQ in Python. I made Server and Client scripts and defined some objects that serialize and deserialize into and from json for the requests, so I could send a file uid or other parameters alongside the binary data (encoded with base64). Everything works when sending small files, but when I start sending bigger things (tested with a 1.4mb text file), I start getting JSONDecode errors. I am dumping the message sent and received to files, and it looks like the sent message is a correctly formatted json. However, the message received seems to be jumbled, for example:
Sending: {"uid":"test",Rhc2Rhc2Rhc2Rhc2Rhc2Rhc2=="}
Received: Rhc2Rhc2{"uid":"test",Rhc2Rhc2Rhc2Rhc2=="}
Obviously the jumbled blocks are much bigger than that. Has anyone run into this problem before?
Here are some code segments, I can upload the full code but it is not very tidy:
Upload Request:
class FileSystemUploadRequest:
def __init__(self, uid, data):
self.uid = uid
self.data = data
def encode(self):
dicc = {"uid": self.uid, "data": base64.b64encode(self.data).decode()}
return json.dumps(dicc)
@staticmethod
def decode(jsonstr):
dicc = json.loads(jsonstr)
uid = dicc["uid"]
data = base64.b64decode(dicc["data"])
return FileSystemUploadRequest(uid, data)
Client (sender):
def put_file_blocking(self, uid, data):
print(" [x] Llamaron a put_file_blocking")
corr_id = str(uuid.uuid4())
request = FileSystemUploadRequest(uid, data)
f = open("dump", "w")
f.write(request.encode())
f.close()
# Send upload request
self.channel.basic_publish(exchange='',
routing_key=self.queue_upload,
properties=pika.BasicProperties(
reply_to=self.callback_queue_name,
correlation_id=corr_id
),
body=request.encode())
Server (receiver):
def upload_request(ch, method, props, body):
f = open("_dump", "w")
f.write(body.decode())
f.close()
# Get Upload Request
request = FileSystemUploadRequest.decode(body)
print(" [x] Received upload request for: " + request.uid)
filename = fs_dir + "/" + request.uid
# Do upload
f = open(filename, "wb")
f.write(request.data)
f.close()
# Create Upload Response
status = Status.OK
response = FileSystemUploadResponse(status)
# Send response
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=response.encode())
ch.basic_ack(delivery_tag=method.delivery_tag)
print(" [x] Finished upload request for: " + request.uid)
RabbitMQ is a message broker and not a file storage system. As the assumption is messages are somewhat limited in size, several optimization are in place which will work against your use case.
To share files in a distributed system you should rely on an object storage of sort such as OpenStack Swift or AWS S3.
You can still use RabbitMQ to notify the presence of a new file but rather than embedding it into the message, you provide the location in your object storage and the consumer retrieves the file from there.
A couple of reference links: