首页 > 解决方案 > Python websocket客户端关闭连接

问题描述

websocket_client.py

#!/usr/bin/env python

# WS client example

import asyncio
import websockets
import json
import sys
import os
import time

from aiortc import RTCPeerConnection, RTCSessionDescription

pcs = []
peer_connections = 0
local_username = "epalxeis"
websocket = None

async def open_websocket():
    global websocket
    uri = "ws://192.168.1.5:8080"
    async with websockets.connect(uri) as websocket:
        #send username
        await websocket.send(json.dumps({"type":"register","username":local_username}))
        async for message in websocket:
            message = json.loads(message)
            if(message["type"]=="create_peer"):
                await create_peer(message["username"],websocket)
            elif(message["type"]=="offer"):
                await receive_offer(message,websocket)
            elif(message["type"]=="answer"):
                receive_answer(message,websocket)
            elif(message["type"]=="unregister"):
                unregister(message,websocket)
        

                
                
async def create_peer(username,websocket):
    global pcs
    global peer_connections
    global local_username
    pc = RTCPeerConnection()
    pc_index = peer_connections
    pcs.append([username,pc])
    
    peer_connections = peer_connections+1
    
    '''
        add stream (video-audio to pc)
    '''
    
    @pc.on("track")
    def on_track(track):
        '''
            play track with PyQt5
        '''
        
    offer = await pcs[pc_index].createOffer()
    pcs[pc_index][1].setLocalDescription(offer)
    
    
    data = {"type":"offer","from":local_username,"to":username,"offer":offer}
    
    await websocket.send(json.dumps(data))
    
async def receive_offer(message,websocket):
    global pcs
    global peer_connections
    global local_username
    username = message["username"]
    offer = RTCSessionDescription(sdp=message["offer"]["sdp"], type=message["offer"]["type"])
    
    pc = RTCPeerConnection()
    pc_index = peer_connections
    pcs.append([username,pc])
    
    peer_connections = peer_connections+1
    
    '''
        add stream (video-audio to pc)
    '''
    
    @pc.on("track")
    def on_track(track):
        '''
            play track with PyQt5
        '''
    
    await pcs[pc_index][1].setRemoteDescription(offer)
    answer = await pcs[pc_index][1].createAnswer()
    await pcs[pc_index][1].setLocalDescription(answer)
    answer_json = {"sdp":answer.sdp,"type":"answer"}
    data = {"type":"answer","from":local_username,"to":username,"answer":answer_json}
    await websocket.send(json.dumps(data))
    
async def receive_answer(message,websocket):
    global pcs
    username = message["username"]
    answer = message["answer"]
    for pc in pcs:
        if(pc[0]==username):
            pc[1].setRemoteDescription(answer)
            
async def unregister(message,websocket):
    global pcs
    global peer_connections
    username = message["username"]
    index = 0
    counter = 0
    for pc in pcs:
        if(pc[0]==username):
            pc[1].close()
            '''
                PyQt5 manage for closing Output Device
            '''
            index = counter
            break
        counter = counter + 1
    del pcs[index]
    peer_connections = peer_connections-1
    
asyncio.get_event_loop().run_until_complete(open_websocket())

websocket_server.py

import asyncio
import websockets
import json
import ssl

peers = ()
    
async def on_open(websocket,path):
    async for message in websocket:
        message = json.loads(message)
        if(message["type"]=="register"):
            await register(websocket,message["username"])
        elif(message["type"]=="offer"):
            await send_offer(websocket,message)
        elif(message["type"]=="answer"):
            await send_answer(websocket,message)
        elif(message["type"]=="candidate"):
            await send_candidate(websocket,message)
    await unregister(websocket)
            
async def register(websocket,username):
    global peers
    print(username+" logged in.")
    peers = peers + ((websocket,username),)
    for peer in peers:
        if peer[0] is not websocket:
            await websocket.send(json.dumps({"type": "create_peer","username":peer[1]}))

async def send_offer(websocket,message):
    global peers
    offer_creator = message["from"]
    offer_receiver = message["to"]
    offer = message["offer"]
    print(offer_creator+" creates and sends offer to "+offer_receiver)
    for peer in peers:
        if(peer[1]==offer_receiver):
            await peer[0].send(json.dumps({"type": "offer","username":offer_creator,"offer":offer}))
            
async def send_answer(websocket,message):
    global peers
    answer_creator = message["from"]
    answer_receiver = message["to"]
    answer = message["answer"]
    print(answer_creator+" creates and sends answer to "+answer_receiver)
    for peer in peers:
        if(peer[1]==answer_receiver):
            await peer[0].send(json.dumps({"type": "answer","username":answer_creator,"answer":answer}))
            
async def send_candidate(websocket,message):
    global peers
    candidate_creator = message["from"]
    candidate_receiver = message["to"]
    candidate = message["candidate"]
    print(candidate_creator+" send candidate packet to "+candidate_receiver)
    for peer in peers:
        if(peer[1]==candidate_receiver):
            await peer[0].send(json.dumps({"type": "candidate","username":candidate_creator,"candidate":candidate}))
            
async def unregister(websocket):
    global peers
    for peer_1 in peers:
        if(peer_1[0]==websocket):
            username = peer_1[1]
            print(username+" logged out.")
            for peer_2 in peers:
                if(peer_2[0] is not websocket):
                    await peer_2[0].send(json.dumps({"type": "unregister","username":username}))
                    
    peers_list = list(peers)
    peers_list.remove((websocket,username))
    peers = tuple(peers_list)
        
        
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(r'C:\xampp\crt\localhost\server.crt',r'C:\xampp\crt\localhost\server.key')
ssl_context = None
start_server = websockets.serve(on_open, "192.168.1.5", 8080, ssl=ssl_context)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

首先我运行 websocket_server.py (这段代码正在处理 websocket 请求)

其次我运行 websocket_client.py

之后我在第一个 cmd 中看到:epalxeis 登录。所以连接是正确的。但是如果我关闭第二个 cmd(使用 x 按钮或使用 ctr-c),我在服务器的 cmd 中看到了这个错误:

Error in connection handler
Traceback (most recent call last):
  File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\asyncio\windows_events.py", line 453, in finish_recv
    return ov.getresult()
OSError: [WinError 64] Το καθορισμένο όνομα δικτύου δεν είναι πια διαθέσιμο

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\protocol.py", line 827, in transfer_data
    message = await self.read_message()
  File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\protocol.py", line 895, in read_message
    frame = await self.read_data_frame(max_size=self.max_size)
  File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\protocol.py", line 971, in read_data_frame
    frame = await self.read_frame(max_size)
  File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\protocol.py", line 1047, in read_frame
    frame = await Frame.read(
  File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\framing.py", line 105, in read
    data = await reader(2)
  File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\asyncio\streams.py", line 723, in readexactly
    await self._wait_for_data('readexactly')
  File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\asyncio\streams.py", line 517, in _wait_for_data
    await self._waiter
  File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\asyncio\proactor_events.py", line 280, in _loop_reading
    data = fut.result()
  File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\asyncio\windows_events.py", line 808, in _poll
    value = callback(transferred, key, ov)
  File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\asyncio\windows_events.py", line 457, in finish_recv
    raise ConnectionResetError(*exc.args)
ConnectionResetError: [WinError 64] Το καθορισμένο όνομα δικτύου δεν είναι πια διαθέσιμο

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\server.py", line 191, in handler
    await self.ws_handler(self, path)
  File "websocket_server.py", line 9, in on_open
    async for message in websocket:
  File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\protocol.py", line 439, in __aiter__
    yield await self.recv()
  File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\protocol.py", line 509, in recv
    await self.ensure_open()
  File "C:\Users\Χρήστος\AppData\Local\Programs\Python\Python38\lib\site-packages\websockets\protocol.py", line 803, in ensure_open
    raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedError: code = 1006 (connection closed abnormally [internal]), no reason

过程不会停止。我该如何解决这个问题?

提前致谢,

克里斯·帕帕斯

标签: pythonwebsocketserverclient

解决方案


这篇文章可以帮助你。

程序无法停止的原因是即使主线程捕获异常并停止,子线程也会继续运行。请注意,异步是由线程实现的(我追求代码)。Python中有两种线程,普通线程和守护线程。如果父线程死亡,前者继续运行,后者停止。asyncio 是用前者实现的,因此会出现这样的问题。


推荐阅读