I have the following system: [Client] - [Web Server] - [Connecotr].
A connector is a kind of intermediate code between a web server and a data source.
I need to monitor the server connection with the connector. If the connection is lost, then I must notify the client.
Communication between the web server and the connector is organized using socketio.
The problem is that if the connector stops working, then the web server will know about it only after a minute (this is in the best case).
I decided that the server should check the status of the connector every second.
When the connector is connected on the server, a background task starts. The essence of the task: every second: 1) fix the time; 2) save the fixed time to the stack; 3) send the echo message to the connector. (see server.background_thread)
The connector accepts the echo message and time stamp as a parameter and sends an echo message to the web server, as the parameter it passes the time stamp that it received. (see client.echo)
The web server receives an echo message, if the timestamp is equal to the last value in the stack, then this value is removed from the stack. (see server.on_echo_connector)
On the web server, at each iteration, the stack size is checked (see server.background_thread). If it is greater than 5, then this means that the connector did not respond to the echo message 5 times, we believe that the connector is unavailable, and disconnect from it.
When the server realizes that the connector is unavailable, it is necessary to terminate the thread that sent echo messages to the connector.
Once the stack size is greater than 5, I exit the infinite loop and call flask_socketio.disconnect (connector_sid, '/ connector')
. After this call nothing works (e.g. print
)
In the on_disconnect_connector
(server) method, thread.join()
is called and never terminates.
And I need to complete the thread, so that when the connector is started again, it will connect successfully and everything starts all over again.
How to solve this problem?
server
# -*- coding: utf-8 -*-
import os
import threading
import time
import collections
from datetime import datetime
import flask
import flask_socketio
def get_unix_time():
return int(time.mktime(datetime.now().timetuple()))
class Stack(collections.deque):
def __init__(self, iterable=(), maxlen=None):
collections.deque.__init__(self, iterable, maxlen)
@property
def size(self):
return len(self)
@property
def empty(self):
return self.size == 0
@property
def head(self):
return self[-1]
@property
def tail(self):
return self[0]
def push(self, x):
self.append(x)
# SERVER
app = flask.Flask(__name__)
sio = flask_socketio.SocketIO(app, async_mode='gevent')
connector_sid = None
echo_stack = Stack()
thread = None
thread_lock = threading.Lock()
def background_thread(app):
time.sleep(2) # delay for normal connection
while True:
if echo_stack.size >= 5:
break
time_ = get_unix_time()
echo_stack.push(time_)
sio.emit('echo', time_, namespace='/connector')
sio.sleep(1)
with app.app_context():
flask_socketio.disconnect(connector_sid, '/connector')
@sio.on('connect', namespace='/connector')
def on_connect_connector():
"""Connector connection event handler."""
global connector_sid, thread
print 'Attempt to connect a connector {}...'.format(request.sid)
# if the connector is already connected, reject the connection
if connector_sid is not None:
print 'Connection for connector {} rejected'.format(request.sid)
return False
# raise flask_socketio.ConnectionRefusedError('Connector already connected')
connector_sid = request.sid
print('Connector {} connected'.format(request.sid))
with thread_lock:
if thread is None:
thread = sio.start_background_task(
background_thread, current_app._get_current_object())
# notify clients about connecting a connector
sio.emit('set_connector_status', True, namespace='/client')
@sio.on('disconnect', namespace='/connector')
def on_disconnect_connector():
"""Connector disconnect event handler."""
global connector_sid, thread
print 'start join'
thread.join()
print 'end join'
thread = None
print 'after disconet:', thread
connector_sid = None
echo_stack.clear()
print('Connector {} disconnect'.format(request.sid))
# notify clients of disconnected connector
sio.emit('set_connector_status', False, namespace='/client')
@sio.on('echo', namespace='/connector')
def on_echo_connector(time_):
if not echo_stack.empty:
if echo_stack.head == time_:
echo_stack.pop()
@sio.on('message', namespace='/connector')
def on_message_connector(cnt):
# print 'Msg: {}'.format(cnt)
pass
if __name__ == '__main__':
sio.run(app)
client
# -*- coding: utf-8 -*-
import sys
import threading
import time
import socketio
import socketio.exceptions
sio = socketio.Client()
thread = None
thread_lock = threading.Lock()
work = False
def background_thread():
# example task
cnt = 0
while work:
cnt += 1
if cnt % 10 == 0:
sio.emit('message', cnt // 10, namespace='/connector')
sio.sleep(0.1)
@sio.on('connect', namespace='/connector')
def on_connect():
"""Server connection event handler."""
global thread, work
print '\n----- Connected to server -----' \
'\n----- My SID: {} -----\n'.format(sio.sid)
work = True # set flag
# run test task
with thread_lock:
if thread is None:
thread = sio.start_background_task(background_thread)
@sio.on('disconnect', namespace='/connector')
def on_disconnect():
"""Server disconnect event handler."""
global thread, work
# clear the work flag so that at the next iteration the endless loop ends
work = False
thread.join()
thread = None
# disconnect from server
sio.disconnect()
print '\n----- Disconnected from server -----\n'
# switch to the mode of infinite attempts to connect to the server
main()
@sio.on('echo', namespace='/connector')
def on_echo(time_):
sio.emit('echo', time_, namespace='/connector')
def main():
while True:
try:
sio.connect('http://localhost:5000/connector',
namespaces=['/connector'])
sio.wait()
except socketio.exceptions.ConnectionError:
print 'Trying to connect to the server...'
time.sleep(1)
except KeyboardInterrupt:
print '\n---------- EXIT ---------\n'
sys.exit()
except Exception as e:
print e
if __name__ == '__main__':
print '\n---------- START CLIENT ----------\n'
main()
Python 2.7
Need to install an additional library for the client (see)
pip install "python-socketio[client]"
Thanks to this library, the WebSocket transport works. Now disconnecting the connector is immediately visible.