I'm trying to send long json strings from one process to another via a UDS
socket and the UDP
protocol, like so:
# server.py
def main():
if os.path.exists(UDSFILE):
os.remove(UDSFILE)
sock = socket(AF_UNIX, SOCK_DGRAM)
sock.bind(UDSFILE)
file = open(CSVFILE, 'ab')
msg = b''
while True:
while True:
try:
package, *_ = sock.recvfrom(4096)
msg += package
except ConnectionError:
log.exception()
raise
if b'\t' in msg:
data, msg = msg.split(b'\t', 1)
break
data = json.loads(data.decode())
exchange, ts, data = data
file.write(("%s\t%s\t%s\n" % (exchange, ts, data)).encode())
# Client.py
def main():
wss = GeminiWSS(args.pair)
wss.start()
sock = socket(AF_UNIX, SOCK_DGRAM)
while True:
try:
sock.sendto((json.dumps(wss.get()) + '\t').encode(), socket_addr.encode())
except Exception as e:
log.exception(e)
log.info("Attempting to restart..")
wss.restart()
except OSError as e:
log.exception(e)
print("OSError on connecting, trying again..")
The server crashes on decoding with the following trace:
Traceback (most recent call last):
File "scribe.py", line 62, in <module>
main()
File "scribe.py", line 49, in main
data = json.loads(data.decode())
File "/opt/anaconda3/lib/python3.5/json/__init__.py", line 319, in loads
return _default_decoder.decode(s)
File "/opt/anaconda3/lib/python3.5/json/decoder.py", line 339, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/opt/anaconda3/lib/python3.5/json/decoder.py", line 355, in raw_decode
obj, end = self.scan_once(s, idx)
json.decoder.JSONDecodeError: Expecting ':' delimiter: line 1 column 4099 (char 4098)
As you can see, I've attempted to use a custom seperator to read the received data (\t
), but this doesn't work (hence the JSONDecodeError
).
How, then, can I make sure that my data is assembled correctly ? Unfortunately, simply ramping up the buffer isn't effective, since I don't have a maximum message length that I know of.
Edit
The code generating data returned from wss.get()
:
def _subscription_thread(self):
"""
Thread Method, running the connection for each endpoint.
:param endpoint:
:return:
"""
try:
conn = create_connection(self.addr + self.endpoint, timeout=5)
except WebSocketTimeoutException:
self._controller_q.put('restart')
return
while self.endpoint_thread_running:
try:
msg = conn.recv()
except WebSocketTimeoutException:
log.exception()
raise
log.debug("_subscription_thread(): Putting data on q..")
try:
self.data_q.put(('Gemini', json.loads(msg), time.time()), timeout=1)
except TimeoutError:
continue
finally:
log.debug("_subscription_thread(): Data Processed, looping back..")
conn.close()
log.debug("_subscription_thread(): Thread Loop Ended.")
Data Samples
The bytes
object causing the raising of 'JSONDecodeError
:
b'["Gemini", {"eventId": 609771498, "events": [{"remaining": "21914.3", "reason": "initial", "price": "0.01", "side": "bid", "type": "change", "delta": "21914.3"}, {"remaining": "0.000047", "reason": "initial", "price": "0.06", "side": "bid", "type": "change", "delta": "0.000047"}, {"remaining": "110", "reason": "initial", "price": "0.10", "side": "bid", "type": "change", "delta": "110"}, {"remaining": "1053.33333333", "reason": "initial", "price": "0.15", "side": "bid", "type": "change", "delta": "1053.33333333"}, {"remaining": "1", "reason": "initial", "price": "0.90", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "744", "reason": "initial", "price": "1.00", "side": "bid", "type": "change", "delta": "744"}, {"remaining": "2400", "reason": "initial", "price": "1.10", "side": "bid", "type": "change", "delta": "2400"}, {"remaining": "1", "reason": "initial", "price": "5.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "1", "reason": "initial", "price": "8.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "0.022", "reason": "initial", "price": "9.00", "side": "bid", "type": "change", "delta": "0.022"}, {"remaining": "1", "reason": "initial", "price": "11.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "30", "reason": "initial", "price": "16.00", "side": "bid", "type": "change", "delta": "30"}, {"remaining": "2", "reason": "initial", "price": "24.00", "side": "bid", "type": "change", "delta": "2"}, {"remaining": "400", "reason": "initial", "price": "26.00", "side": "bid", "type": "change", "delta": "400"}, {"remaining": "0.03015", "reason": "initial", "price": "30.00", "side": "bid", "type": "change", "delta": "0.03015"}, {"remaining": "0.97", "reason": "initial", "price": "31.85", "side": "bid", "type": "change", "delta": "0.97"}, {"remaining": "0.029", "reason": "initial", "price": "33.85", "side": "bid", "type": "change", "delta": "0.029"}, {"remaining": "1", "reason": "initial", "price": "36.97", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "2.9975", "reason": "initial", "price": "40.00", "side": "bid", "type": "change", "delta": "2.9975"}, {"remaining": "0.04460443", "reason": "initial", "price": "67.00", "side": "bid", "type": "change", "delta": "0.04460443"}, {"remaining": "16.20200501", "reason": "initial", "price": "99.75", "side": "bid", "type": "change", "delta": "16.20200501"}, {"remaining": "62.08649948", "reason": "initial", "price": "100.00", "side": "bid", "type": "change", "delta": "62.08649948"}, {"remaining": "1", "reason": "initial", "price": "151.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "1", "reason": "initial", "price": "159.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "2.60664747", "reason": "initial", "price": "191.05", "side": "bid", "type": "change", "delta": "2.60664747"}, {"remaining": "1", "reason": "initial", "price": "200.00", "side": "bid", "type": "change", "delta": "1"}, {"remaining": "0.03976369", "reason": "initial", "price": "220.05", "side": "bid", "type": "change", "delta": "0.03976369"}, {"remaining": "2.6635", "reason": "initial", "price": "300.00", "side": "bid", "type": "change", "delta": "2.6635"}, {"remaining": "0.855", "reason": "initial", "price": "350.00", "side": "bid", "type": "change", "delta": "0.855"}, {"remaining": "0.04172229", "reason": "initial", "price": "359.52", "side": "bid", "type": "change", "delta": "0.04172229"}, {"remaining": "0.02773771", "reason": "initial", "price": "360.52", "side": "bid", "type": "change", "delta": "0.02773771"}, {"remaining": "2.59220779", "reason": "initial", "price": "385.00", "side": "bid", "type": "change", "delta": "2.59220779"}, {"remaining": "3.740625", "reason": "initial", "price": "400.00", "side": "bid", "type": "change", "delta": "3.740625"}, {"remaining": "1.21646341", "reason": "initial", "price": "410.00", "side": "bid", "type": "change", "delta": "1.21646341"}, {"remaining": "1.20471014", "reason": "initial", "price": "414.00", "side": "bid", "type": "change", "delta": "1.20471014"}, {"remai["Gemini", {"eventId": 609771500, "events": [{"remaining": "0", "reason": "cancel", "price": "2053.62", "side": "ask", "type": "change", "delta": "-2"}], "type": "update"}, 1495312354.6720355]'
Generated from this source:
pastebin (moving to pastebin, due to text limit)
Ultimately, I've stopped trying to handle the UDS connecting and sending myself, and simply let python do its thing, using the multiprocessing
library's Client
and Listener
classes, and supplying a file path. This now works seemlessly.
> listener.py
from multiprocessing.connection import Listener
serv = Listener('/path/to/uds')
while True:
conn = serv.accept()
try:
while True:
msg = conn.recv()
print(msg)
conn.send(msg.upper())
except EOFError:
print('Conn closed!')
finally:
conn.close()
serv.close()
> client.py
from multiprocessing.connection import Client
large_data = ''.join(['a' for i in range(1000000)])
sock = Client('/path/to/uds')
sock.send(large_data)
answer = sock.recv()
print(answer)
sock.close()
Not to mention that it turned out that I was splitting up the message for no reason, since UDS connections are file descriptors.