首页 > 解决方案 > node.js中的Python多处理-子进程打印不起作用

问题描述

我有一个 node.js 应用程序,它运行一个客户端界面,该界面公开触发机器学习任务的操作。由于在实现机器学习相关的东西时 python 是一个更好的选择,我已经实现了一个运行按需机器学习任务的 python 应用程序。

现在,我需要集成这两个应用程序。已决定我们需要使用单个 (AWS) 实例来集成这两个应用程序。

发现进行这种集成的一种方法是使用python-shell节点模块。在那里,Python 和 Node 之间的通信是由stdin和完成的stdout

在节点上我有这样的东西:

'use strict';

const express = require('express');
const PythonShell = require('python-shell');

var app = express();
app.listen(8000, function () {
    console.log('Example app listening on port 8000!');
});

var options = {
    mode: 'text',
    pythonPath: '../pythonapplication/env/Scripts/python.exe',
    scriptPath: '../pythonapplication/',
    pythonOptions: ['-u'], // Unbuffered
};

var pyshell = new PythonShell('start.py', options);
pyshell.on('message', function (message) {
    console.log(message);
});

app.get('/task', function (req, res) {
    pyshell.send('extract-job');
});

app.get('/terminate', function (req, res) {
    pyshell.send('terminate');
    pyshell.end(function (err, code, signal) {
        console.log(err)
        console.log(code)
        console.log(signal);
    });
});

在 python 上,我有一个主脚本,它加载一些东西并调用一个服务器脚本,它永远运行读取行,sys.stdin.readline()然后执行相应的任务。

start.py是:

if __name__ == '__main__':
    # data = json.loads(sys.argv[1])
    from multiprocessing import Manager, Pool
    import logging
    import provider, server

    # Get logging setup objects
    debug_queue, debug_listener = provider.shared_logging(logging.DEBUG, 'python-server-debug.log')
    info_queue, info_listener = provider.shared_logging(logging.INFO, 'python-server.log')

    logger = logging.getLogger(__name__)

    # Start logger listener
    debug_listener.start()
    info_listener.start()

    logger.info('Initializing pool of workers...')
    pool = Pool(initializer=provider.worker, initargs=[info_queue, debug_queue])

    logger.info('Initializing server...')
    try:
        server.run(pool)
    except (SystemError, KeyboardInterrupt) as e:
        logger.info('Execution terminated without errors.')
    except Exception as e:
        logger.error('Error on main process:', exc_info=True)
    finally:
        pool.close()
        pool.join()
        debug_listener.stop()
        info_listener.stop()

    print('Done.')

两者info_queuedebug_queue用于multiprocessing.Queue处理多处理日志记录。如果我以独立方式运行我的 python 应用程序,则一切正常,即使在使用工作人员池时(日志被正确记录、打印、正确打印......)

但是,如果我尝试运行 using python-shell,则只有我的主进程打印和日志被正确打印和记录......来自我的工作人员池的每条消息(打印或日志)都会被保留,直到我终止 python 脚本。

换句话说,每条消息都将保留到运行finally步骤server.py...

有人对这个问题有任何见解吗?你们听说过python-bridge模块吗?这是一个更好的解决方案吗?您能否为不使用两个独立服务器的此类集成提出更好的方法?


在这里,我发布了我的真实provider脚本,以及我为服务器脚本所做的快速模拟(真实的东西太多了)

模拟server.py

import json
import logging
import multiprocessing
import sys
import time
from json.decoder import JSONDecodeError
from threading import Thread


def task(some_args):
    logger = logging.getLogger(__name__)

    results = 'results of machine learn task goes here, as a string'

    logger.info('log whatever im doing')
    # Some machine-learn task...

    logger.info('Returning results.')
    return results

def answer_node(message):
    print(message)
    # sys.stdout.write(message)
    # sys.stdout.flush()

def run(pool, recrutai, job_pool, candidate_queue):
    logger = logging.getLogger(__name__)

    workers = []
    logger.info('Server is ready and waiting for commands')
    while True:
        # Read input stream
        command = sys.stdin.readline()
        command = command.split('\n')[0]

        logger.debug('Received command: %s', command)

        if command == 'extract-job':
            logger.info(
                'Creating task.',
            )

            # TODO: Check data attributes
            p = pool.apply_async(
                func=task,
                args=('args'),
                callback=answer_node
            )

            # What to do with workers array?!
            workers.append(p)

        elif command == 'other-commands':
            pass
            # Other task here

        elif command == 'terminate':
            raise SystemError

        else:
            logger.warn(
                'Received an invalid command %s.',
                command
            )

我的provider.py

import logging
import os
from logging.handlers import QueueHandler, QueueListener
from multiprocessing import Queue


def shared_logging(level, file_name):
    # Create main logging file handler
    handler = logging.FileHandler(file_name)
    handler.setLevel(level)

    # Create logging format
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)

    # Create queue shared between all process to centralize logging features
    logger_queue = Queue() # multiprocessing.Queue

    # Create logger queue listener to send records from logger_queue to handler
    logger_listener = QueueListener(logger_queue, handler)

    return logger_queue, logger_listener

def process_logging(info_queue, debug_queue, logger_name=None):
    # Create logging queue handlers
    debug_queue_handler = QueueHandler(debug_queue)
    debug_queue_handler.setLevel(logging.DEBUG)

    info_queue_handler = QueueHandler(info_queue)
    info_queue_handler.setLevel(logging.INFO)

    # Setup level of process logger
    logger = logging.getLogger()
    if logger_name:
        logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)

    # Add handlers to the logger
    logger.addHandler(debug_queue_handler)
    logger.addHandler(info_queue_handler)

def worker(info_queue, debug_queue):
    # Setup worker process logging
    process_logging(info_queue, debug_queue)
    logging.debug('Process %s initialized.', os.getpid())

标签: node.jspython-3.xstdoutpython-multiprocessing

解决方案


推荐阅读