I'm attempting to build a toy in-memory Redis server in Python using asyncio and Unix domain sockets.
My minimal example just returns the value baz
for every request:
import asyncio
class RedisServer:
def __init__(self):
self.server_address = "/tmp/redis.sock"
async def handle_req(self, reader, writer):
await reader.readline()
writer.write(b"$3\r\nbaz\r\n")
await writer.drain()
writer.close()
await writer.wait_closed()
async def main(self):
server = await asyncio.start_unix_server(self.handle_req, self.server_address)
async with server:
await server.serve_forever()
def run(self):
asyncio.run(self.main())
RedisServer().run()
When I test two sequential client requests with the redis
client library with the following script, it works:
import time
import redis
r = redis.Redis(unix_socket_path="/tmp/redis.sock")
r.get("foo")
time.sleep(1)
r.get("bar")
However, if I remove the time.sleep(1)
, sometimes it works, and sometimes the second request fails fails with either:
Traceback (most recent call last):
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 706, in send_packed_command
sendall(self._sock, item)
File "/tmp/env/lib/python3.8/site-packages/redis/_compat.py", line 9, in sendall
return sock.sendall(*args, **kwargs)
BrokenPipeError: [Errno 32] Broken pipe
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "test.py", line 9, in <module>
r.get("bar")
File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 1606, in get
return self.execute_command('GET', name)
File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 900, in execute_command
conn.send_command(*args)
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 725, in send_command
self.send_packed_command(self.pack_command(*args),
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 717, in send_packed_command
raise ConnectionError("Error %s while writing to socket. %s." %
redis.exceptions.ConnectionError: Error 32 while writing to socket. Broken pipe.
Or:
Traceback (most recent call last):
File "test.py", line 9, in <module>
r.get("bar")
File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 1606, in get
return self.execute_command('GET', name)
File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 901, in execute_command
return self.parse_response(conn, command_name, **options)
File "/tmp/env/lib/python3.8/site-packages/redis/client.py", line 915, in parse_response
response = connection.read_response()
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 739, in read_response
response = self._parser.read_response()
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 324, in read_response
raw = self._buffer.readline()
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 256, in readline
self._read_from_socket()
File "/tmp/env/lib/python3.8/site-packages/redis/connection.py", line 201, in _read_from_socket
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
redis.exceptions.ConnectionError: Connection closed by server.
It seems like my implementation is missing some critical behavior that the client library expects (probably due to it being async). What am I missing?
If you want to close the socket after each request you would need to use write_eof()
, which
Close the write end of the stream after the buffered write data is flushed.
see docs.python.org -> write_eof
Your code slightly modified would look like this:
async def handle_req(self, reader, writer):
await reader.readline()
writer.write(b"$3\r\nbaz\r\n")
await writer.drain()
writer.write_eof()
writer.close()
await writer.wait_closed()
Typically you would not close the socket after every request.
The following example is for illustration purposes only and is intended to show that the socket does not need to be closed. Of course you would always read one line and then interpret the data according to the Redis protocol. We know here that two GET commands are sent (each 5 lines, indicator for array with 2 elements, indicator for string, the string value 'GET' and again a string indicator and the corresponding value, namely the key)
async def handle_req(self, reader, writer):
print("start")
for i in range(0, 2):
for x in range(0, 5):
print(await reader.readline())
writer.write(b"$3\r\nbaz\r\n")
await writer.drain()
writer.write_eof()
writer.close()
await writer.wait_closed()
On client sending is done like this:
print(r.get("foo"))
print(r.get("bar"))
time.sleep(1)
The last time.sleep is to ensure the client does not quit immediately.
The output on the console is:
start
b'*2\r\n'
b'$3\r\n'
b'GET\r\n'
b'$3\r\n'
b'foo\r\n'
b'*2\r\n'
b'$3\r\n'
b'GET\r\n'
b'$3\r\n'
b'bar\r\n'
Note that start
is output only once, which shows that we can handle multiple requests without having to close the socket immediately.