node.js - node.js中的Python多处理-子进程打印不起作用
问题描述
我有一个 node.js 应用程序,它运行一个客户端界面,该界面公开触发机器学习任务的操作。由于在实现机器学习相关的东西时 python 是一个更好的选择,我已经实现了一个运行按需机器学习任务的 python 应用程序。
现在,我需要集成这两个应用程序。已决定我们需要使用单个 (AWS) 实例来集成这两个应用程序。
发现进行这种集成的一种方法是使用python-shell
节点模块。在那里,Python 和 Node 之间的通信是由stdin
和完成的stdout
。
在节点上我有这样的东西:
'use strict';
const express = require('express');
const PythonShell = require('python-shell');
var app = express();
app.listen(8000, function () {
console.log('Example app listening on port 8000!');
});
var options = {
mode: 'text',
pythonPath: '../pythonapplication/env/Scripts/python.exe',
scriptPath: '../pythonapplication/',
pythonOptions: ['-u'], // Unbuffered
};
var pyshell = new PythonShell('start.py', options);
pyshell.on('message', function (message) {
console.log(message);
});
app.get('/task', function (req, res) {
pyshell.send('extract-job');
});
app.get('/terminate', function (req, res) {
pyshell.send('terminate');
pyshell.end(function (err, code, signal) {
console.log(err)
console.log(code)
console.log(signal);
});
});
在 python 上,我有一个主脚本,它加载一些东西并调用一个服务器脚本,它永远运行读取行,sys.stdin.readline()
然后执行相应的任务。
start.py
是:
if __name__ == '__main__':
# data = json.loads(sys.argv[1])
from multiprocessing import Manager, Pool
import logging
import provider, server
# Get logging setup objects
debug_queue, debug_listener = provider.shared_logging(logging.DEBUG, 'python-server-debug.log')
info_queue, info_listener = provider.shared_logging(logging.INFO, 'python-server.log')
logger = logging.getLogger(__name__)
# Start logger listener
debug_listener.start()
info_listener.start()
logger.info('Initializing pool of workers...')
pool = Pool(initializer=provider.worker, initargs=[info_queue, debug_queue])
logger.info('Initializing server...')
try:
server.run(pool)
except (SystemError, KeyboardInterrupt) as e:
logger.info('Execution terminated without errors.')
except Exception as e:
logger.error('Error on main process:', exc_info=True)
finally:
pool.close()
pool.join()
debug_listener.stop()
info_listener.stop()
print('Done.')
两者info_queue
都debug_queue
用于multiprocessing.Queue
处理多处理日志记录。如果我以独立方式运行我的 python 应用程序,则一切正常,即使在使用工作人员池时(日志被正确记录、打印、正确打印......)
但是,如果我尝试运行 using python-shell
,则只有我的主进程打印和日志被正确打印和记录......来自我的工作人员池的每条消息(打印或日志)都会被保留,直到我终止 python 脚本。
换句话说,每条消息都将保留到运行finally
步骤server.py
...
有人对这个问题有任何见解吗?你们听说过python-bridge
模块吗?这是一个更好的解决方案吗?您能否为不使用两个独立服务器的此类集成提出更好的方法?
在这里,我发布了我的真实provider
脚本,以及我为服务器脚本所做的快速模拟(真实的东西太多了)
模拟server.py
:
import json
import logging
import multiprocessing
import sys
import time
from json.decoder import JSONDecodeError
from threading import Thread
def task(some_args):
logger = logging.getLogger(__name__)
results = 'results of machine learn task goes here, as a string'
logger.info('log whatever im doing')
# Some machine-learn task...
logger.info('Returning results.')
return results
def answer_node(message):
print(message)
# sys.stdout.write(message)
# sys.stdout.flush()
def run(pool, recrutai, job_pool, candidate_queue):
logger = logging.getLogger(__name__)
workers = []
logger.info('Server is ready and waiting for commands')
while True:
# Read input stream
command = sys.stdin.readline()
command = command.split('\n')[0]
logger.debug('Received command: %s', command)
if command == 'extract-job':
logger.info(
'Creating task.',
)
# TODO: Check data attributes
p = pool.apply_async(
func=task,
args=('args'),
callback=answer_node
)
# What to do with workers array?!
workers.append(p)
elif command == 'other-commands':
pass
# Other task here
elif command == 'terminate':
raise SystemError
else:
logger.warn(
'Received an invalid command %s.',
command
)
我的provider.py
:
import logging
import os
from logging.handlers import QueueHandler, QueueListener
from multiprocessing import Queue
def shared_logging(level, file_name):
# Create main logging file handler
handler = logging.FileHandler(file_name)
handler.setLevel(level)
# Create logging format
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
# Create queue shared between all process to centralize logging features
logger_queue = Queue() # multiprocessing.Queue
# Create logger queue listener to send records from logger_queue to handler
logger_listener = QueueListener(logger_queue, handler)
return logger_queue, logger_listener
def process_logging(info_queue, debug_queue, logger_name=None):
# Create logging queue handlers
debug_queue_handler = QueueHandler(debug_queue)
debug_queue_handler.setLevel(logging.DEBUG)
info_queue_handler = QueueHandler(info_queue)
info_queue_handler.setLevel(logging.INFO)
# Setup level of process logger
logger = logging.getLogger()
if logger_name:
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
# Add handlers to the logger
logger.addHandler(debug_queue_handler)
logger.addHandler(info_queue_handler)
def worker(info_queue, debug_queue):
# Setup worker process logging
process_logging(info_queue, debug_queue)
logging.debug('Process %s initialized.', os.getpid())
解决方案
推荐阅读
- flutter - 如何在flutter中导航到段落文本中的特定URL
- python-3.x - Request metadata gets appended to the content of file while uploading to OneDrive using rest api
- python - 如何根据特定功能仅连接数据框中的非冗余行?
- ssl - 从 Java 连接时 PKIX 路径验证失败
- html - 如何在一个元素悬停并影响css中的另一个元素/类时添加悬停效果?
- node.js - 尝试使用 gulp.build 命令构建时出错
- sql - DB2(找到光标)无效的关键字“找到”
- wso2 - WSO2 花费太多时间来响应令牌到期
- java - 我无法更改 thingsboard 应用程序的徽标
- powershell - Pester 测试更高级的 powershell 脚本