websocket_client.py
#!/usr/bin/env python
# WS client example
import asyncio
import websockets
import json
import sys
import os
import time
from aiortc import RTCPeerConnection, RTCSessionDescription
pcs = []
peer_connections = 0
local_username = "epalxeis"
websocket = None
async def open_websocket():
global websocket
uri = "ws://192.168.1.5:8080"
async with websockets.connect(uri) as websocket:
#send username
await websocket.send(json.dumps({"type":"register","username":local_username}))
async for message in websocket:
message = json.loads(message)
if(message["type"]=="create_peer"):
await create_peer(message["username"],websocket)
elif(message["type"]=="offer"):
await receive_offer(message,websocket)
elif(message["type"]=="answer"):
receive_answer(message,websocket)
elif(message["type"]=="unregister"):
unregister(message,websocket)
async def create_peer(username,websocket):
global pcs
global peer_connections
global local_username
pc = RTCPeerConnection()
pc_index = peer_connections
pcs.append([username,pc])
peer_connections = peer_connections+1
'''
add stream (video-audio to pc)
'''
@pc.on("track")
def on_track(track):
'''
play track with PyQt5
'''
offer = await pcs[pc_index].createOffer()
pcs[pc_index][1].setLocalDescription(offer)
data = {"type":"offer","from":local_username,"to":username,"offer":offer}
await websocket.send(json.dumps(data))
async def receive_offer(message,websocket):
global pcs
global peer_connections
global local_username
username = message["username"]
offer = RTCSessionDescription(sdp=message["offer"]["sdp"], type=message["offer"]["type"])
pc = RTCPeerConnection()
pc_index = peer_connections
pcs.append([username,pc])
peer_connections = peer_connections+1
'''
add stream (video-audio to pc)
'''
@pc.on("track")
def on_track(track):
'''
play track with PyQt5
'''
await pcs[pc_index][1].setRemoteDescription(offer)
answer = await pcs[pc_index][1].createAnswer()
await pcs[pc_index][1].setLocalDescription(answer)
answer_json = {"sdp":answer.sdp,"type":"answer"}
data = {"type":"answer","from":local_username,"to":username,"answer":answer_json}
await websocket.send(json.dumps(data))
async def receive_answer(message,websocket):
global pcs
username = message["username"]
answer = message["answer"]
for pc in pcs:
if(pc[0]==username):
pc[1].setRemoteDescription(answer)
async def unregister(message,websocket):
global pcs
global peer_connections
username = message["username"]
index = 0
counter = 0
for pc in pcs:
if(pc[0]==username):
pc[1].close()
'''
PyQt5 manage for closing Output Device
'''
index = counter
break
counter = counter + 1
del pcs[index]
peer_connections = peer_connections-1
asyncio.get_event_loop().run_until_complete(open_websocket())
websocket_server.py
import asyncio
import websockets
import json
import ssl
peers = ()
async def on_open(websocket,path):
async for message in websocket:
message = json.loads(message)
if(message["type"]=="register"):
await register(websocket,message["username"])
elif(message["type"]=="offer"):
await send_offer(websocket,message)
elif(message["type"]=="answer"):
await send_answer(websocket,message)
elif(message["type"]=="candidate"):
await send_candidate(websocket,message)
await unregister(websocket)
async def register(websocket,username):
global peers
print(username+" logged in.")
peers = peers + ((websocket,username),)
for peer in peers:
if peer[0] is not websocket:
await websocket.send(json.dumps({"type": "create_peer","username":peer[1]}))
async def send_offer(websocket,message):
global peers
offer_creator = message["from"]
offer_receiver = message["to"]
offer = message["offer"]
print(offer_creator+" creates and sends offer to "+offer_receiver)
for peer in peers:
if(peer[1]==offer_receiver):
await peer[0].send(json.dumps({"type": "offer","username":offer_creator,"offer":offer}))
async def send_answer(websocket,message):
global peers
answer_creator = message["from"]
answer_receiver = message["to"]
answer = message["answer"]
print(answer_creator+" creates and sends answer to "+answer_receiver)
for peer in peers:
if(peer[1]==answer_receiver):
await peer[0].send(json.dumps({"type": "answer","username":answer_creator,"answer":answer}))
async def send_candidate(websocket,message):
global peers
candidate_creator = message["from"]
candidate_receiver = message["to"]
candidate = message["candidate"]
print(candidate_creator+" send candidate packet to "+candidate_receiver)
for peer in peers:
if(peer[1]==candidate_receiver):
await peer[0].send(json.dumps({"type": "candidate","username":candidate_creator,"candidate":candidate}))
async def unregister(websocket):
global peers
for peer_1 in peers:
if(peer_1[0]==websocket):
username = peer_1[1]
print(username+" logged out.")
for peer_2 in peers:
if(peer_2[0] is not websocket):
await peer_2[0].send(json.dumps({"type": "unregister","username":username}))
peers_list = list(peers)
peers_list.remove((websocket,username))
peers = tuple(peers_list)
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(r'C:\xampp\crt\localhost\server.crt',r'C:\xampp\crt\localhost\server.key')
ssl_context = None
start_server = websockets.serve(on_open, "192.168.1.5", 8080, ssl=ssl_context)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
First i run the websocket_server.py (this code is handling the websocket requests)
Second i run the websocket_client.py
After that i saw in the first cmd: epalxeis logged in. So the connection is correct. But if i close the second cmd (with the x button or with the ctr-c), i saw this error in the server's cmd:
Error in connection handler
Traceback (most recent call last):
File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\asyncio\windows_events.py", line 453, in finish_recv
return ov.getresult()
OSError: [WinError 64] Το καθορισμένο όνομα δικτύου δεν είναι πια διαθέσιμο
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\protocol.py", line 827, in transfer_data
message = await self.read_message()
File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\protocol.py", line 895, in read_message
frame = await self.read_data_frame(max_size=self.max_size)
File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\protocol.py", line 971, in read_data_frame
frame = await self.read_frame(max_size)
File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\protocol.py", line 1047, in read_frame
frame = await Frame.read(
File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\framing.py", line 105, in read
data = await reader(2)
File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\asyncio\streams.py", line 723, in readexactly
await self._wait_for_data('readexactly')
File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\asyncio\streams.py", line 517, in _wait_for_data
await self._waiter
File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\asyncio\proactor_events.py", line 280, in _loop_reading
data = fut.result()
File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\asyncio\windows_events.py", line 808, in _poll
value = callback(transferred, key, ov)
File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\asyncio\windows_events.py", line 457, in finish_recv
raise ConnectionResetError(*exc.args)
ConnectionResetError: [WinError 64] Το καθορισμένο όνομα δικτύου δεν είναι πια διαθέσιμο
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\server.py", line 191, in handler
await self.ws_handler(self, path)
File "websocket_server.py", line 9, in on_open
async for message in websocket:
File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\protocol.py", line 439, in __aiter__
yield await self.recv()
File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\protocol.py", line 509, in recv
await self.ensure_open()
File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\protocol.py", line 803, in ensure_open
raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedError: code = 1006 (connection closed abnormally [internal]), no reason
The proccess don't stop. How can i resolve the issue?
Thanks in advance,
Chris Pappas
This post helps you.
The reason program couldn't stop is that sub threads continue to run even if main thread catches an exception and stop. Note that asyncio is implemented by thread (I pursued codes). There are two kinds of thread in Python, normal thread and daemon thread. The former continues to run if parent thread dies, the latter stops. asyncio is implemented with the former, therefore such this problem happens.