首页 > 解决方案 > 如何使用 django 通道管理 cpu 和内存

问题描述

有没有办法通过 django 通道更好地管理 cpu 和内存使用情况?我正在尝试通过 websockets 快速传输数据,这在我的 ubuntu 服务器 cpu 达到 100% 并且内存达到 90% 之前运行良好,然后 daphne 崩溃。我使用 NGINX 和 Daphne 作为服务器/代理。

有没有更好的方法来设置redis?或者可能是“清除缓存”或类似的方法?

我的设置是:

设置.py

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("localhost", 6379)],
            "capacity": 5000,
            "expiry": 5,
        },
    },
}

asgi.py

application = ProtocolTypeRouter({
    # Django's ASGI application to handle traditional HTTP requests
    "http": django_asgi_app,

    # WebSocket chat handler
    "websocket": AuthMiddlewareStack(
        URLRouter([
            url(r"^stream/(?P<device_id>[\d\-]+)/$", StreamConsumer.as_asgi()),
            url(r"^status/(?P<device_id>[\d\-]+)/$", StatusConsumer.as_asgi())
        ])
    ),
})

消费者.py

class StreamConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['device_id']
        self.room_group_name = 'stream_%s' % self.room_name
        self.token_passed = self.scope['query_string'].decode() #token passed in with ws://url/?token
        self.token_actual = await self.get_token(self.room_name) #go get actual token for this monitor

        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        #check to ensure tokens match, if they do, allow connection
        if self.token_passed == self.token_actual:
            await self.accept()
            print('True')
        else:
            print(self.token_passed)
            print(self.token_actual)
            await self.close()


    async def disconnect(self, close_code):
        # Leave room group
        print(close_code)
        print("closing")
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    @database_sync_to_async
    def get_token(self, monitor_serial):
        monitor = CustomUser.objects.get(deviceSerial=monitor_serial)
        token = Token.objects.get(user=monitor)
        return token.key


    # Receive message from WebSocket
    async def receive(self, bytes_data=None, text_data=None):
        # Send message to room group
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'stream_data',
                'bytes_data': bytes_data,
                'text_data': text_data
            }
        )

    # Receive message from room group
    async def stream_data(self, event):
        bytes_data = event['bytes_data']
        text_data = event['text_data']
        # Send message to WebSocket
        await self.send(bytes_data=bytes_data, text_data=text_data)

python 向WebSocket发送数据的代码:

#!/usr/bin/python

import websocket
import time

def on_message(ws, message):
    try:
        if message == 'pong':
           global pong
           global receiver_exists
           pong = True
           receiver_exists = True
    except:
        pass

def on_error(ws, error):
    print("### error ###", error)

def on_close(ws):
    print("### closed ###")

def on_open(ws):
    print('### connected ###')
    def stream(*args):
        while True:
            ws.send('large string')
            time.sleep(0.0005)

    stream()


while True:
    uri = f"mysocketurl"
    ws = websocket.WebSocketApp(uri,
                        on_open = on_open,
                        on_message = on_message,
                        on_error = on_error,
                        on_close = on_close)

    ws.run_forever()

达芙妮崩溃后我收到的错误消息是:

2021-05-08 13:45:14,398 WARNING  Application instance <Task pending coro=<ProtocolTypeRouter.__call__() running at /home/polysense/polysensesite/psvirtualenv/lib/python3.6/site-packages/channels/routing.py:71> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7efcf0ab42e8>()]>> for connection <WebSocketProtocol client=['127.0.0.1', 58056] path=b'/stream/1927-0000-0001/'> took too long to shut down and was killed.

您可能还会问,为什么我需要这么快地发送数据。我正在尝试使用 open-cv 发送 jpg 数据来创建视频流。我认为是 open-cv 导致了崩溃,但它似乎只是试图太快地发送 jpg 数据。当我通过延迟 100 毫秒来减慢它时,它不会崩溃,但帧速率是不可接受的。

标签: pythondjangowebsocketdjango-channels

解决方案


推荐阅读