首页 > 解决方案 > 如何从 Python 调用 Javascript 函数

问题描述

我有一个不与客户端交互的python代码(服务器端)。但是,当它(服务器代码)完成时,我需要代表一些项目。我想出的唯一想法是代表一个项目的 JS 函数,从 Python 调用。您能否建议我使用软件包或其他想法来实现这一点。

一些细节(我不知道是否有必要,但可能会有所帮助)

async def start_delete_delay(app, delay):
    """
    The very function which thrust a delay for each front token.
    Key arguments:
    app -- our application.
    delay -- a delay in seconds
    """
    async with app['db'].acquire() as conn:

        # First of all we need to check for database emptiness
        query = text("SELECT True FROM tokens LIMIT(1)")
        if await conn.fetch(query):

            # If database is not empty then we are processing a waiting delay.
            # First, fetching an id & related token from the first position (due to it queue) from database.
            query = select([db.tokens.c.id, db.tokens.c.token]).order_by(asc(db.tokens.c.id)).limit(1)
            query_result = await conn.fetchrow(query)

            # Retrieving an id and token
            id_before_sleep, token = query_result['id'], query_result['token']

            # Setting a delay
            try:
                await asyncio.sleep(delay)

            # Some information related with cancellation error
            # https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel
            except asyncio.CancelledError:
                pass

            # Check whether a token at the first place as same as it was before
            finally:

                # If it possible but all of members picked their tokens over 60 seconds.
                if await conn.fetch(text("SELECT True FROM tokens LIMIT(1)")):
                    query_result = await conn.fetchrow(query)
                    id_after_sleep = query_result['id']

                    # If they are same then we delete that token and starting delay again.
                    if id_before_sleep == id_after_sleep:
                        query = delete(db.tokens).where(db.tokens.c.id == id_before_sleep)

                        # preparing a token for reuse.
                        app['new_token'].prepare_used_token(token)

                        # Deleting a token
                        await conn.fetchrow(query)

                        # I'd like to call a JS function (which I already have) here

                        # Starting a delay for adjacent token, over and over and over
                        task = make_task(start_delete_delay, app, delay)
                        asyncio.gather(task)

标签: javascriptpythonclient-server

解决方案


我找到了两种解决方案,因此如果有人遇到此类问题,请尝试使用它们:

第一个解决方案

线索是 WebSockets。我用aiohttpasyncio

在 JavaScript 文件中,我添加了一个监听套接字:

var socket = new WebSocket('/link-to-websocket')

在服务器端,我添加了一个 websocket_handler,在我的情况下,它在从数据库中删除一个令牌后发送消息

async def websocket_handler(request):

    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.type == aiohttp.WSMsgType.TEXT:
            if app['updt_flag']:
                await ws.send_str("signal")
            else:
                await ws.close()
    return ws

并将其添加到路线

app.add_routes([web.get('/link-to-websocket', websocket_handler)])

1) JavaScript 的工作原理:使用 SSE 深入研究 WebSockets 和 HTTP/2 + 如何选择正确的路径

2) Python aiohttp websockets

然而这种方法不是最好的,我们不使用整个 websocket 的功能,因此让我们继续使用另一种方法:服务器发送事件 (SSE)。这种方法更适合我的问题,因为我们总是从服务器接收响应而没有请求它(而 websockets 不包含这样的选项):

第二种解决方案 正如我上面所说,我将使用 SSE,为此它需要一个 sse-package

pip install aiohttp_sse


import asyncio
from aiohttp_sse import sse_response


async def SSE_request(request):
    loop = request.app.loop
    async with sse_response(request) as resp:
        while True:
            if request.app['updt_flag']:
                await resp.send("signal")
                request.app['updt_flag'] = False
            await asyncio.sleep(1, loop=loop)
    return resp

添加路线

web.get('/update', SSE_request)

向 JS 添加监听 sse:

const evtSource = new EventSource("/update");
evtSource.onmessage = function(e) {
                    display_queue_remove();
}

就这样:)


推荐阅读