首页 > 解决方案 > 如何在不阻塞执行的情况下调用函数

问题描述

我一直在研究一个小型 PoC,我正在尝试做一个 I/O Bound 应用程序来执行功能而不会被阻塞。目前我已经创建了这样的东西:

import time
import concurrent.futures

found_products = []

site_catalog = [
    "https://www.graffitishop.net/Sneakers",
    "https://www.graffitishop.net/T-shirts",
    "https://www.graffitishop.net/Sweatshirts",
    "https://www.graffitishop.net/Shirts"
]


def threading_feeds():
    # Create own thread for each URL as we want to run concurrent
    with concurrent.futures.ThreadPoolExecutor() as executor:
        executor.map(monitor_feed, site_catalog)


def monitor_feed(link: str) -> None:
    old_payload = product_data(...)

    while True:
        new_payload = product_data(...)

        if old_payload != new_payload:
            for links in new_payload:
                if links not in found_products:
                    logger.info(f'Detected new link -> {found_link} | From -> {link}')
                    # Execute filtering function without blocking, how?
                    filtering(link=found_link)

        else:
            logger.info("Nothing new")
            time.sleep(60)
            continue


def filtering(found_link):
    # More code will be added in the future to handle logical code parts
    ...
    # Test
    time.sleep(60)

问题:目前的问题是,每当我们进入该行时filtering(link=found_link),都会有一个filtering(...)休眠 60 秒的调用(这只是一个模拟数据,将来我将有一个逻辑代码部分),然后它的作用是monitor_feed 停止执行并等待直到filtering()完成。

我的问题:我想知道我如何能够执行filtering(...)并且仍然继续循环通过 monitor_feed 而不会在我们调用时被阻塞filtering(...)

标签: pythonpython-3.xmultithreading

解决方案


这是您的代码稍作修改 - 主要问题是变量名称错误(因为它们非常相似)

为了确保我使用 names executor1executor2并且executor2必须在之前创建,while True因为它必须在使用线程时一直存在。

如果你有,def filtering(filtered_link)那么你必须使用相同的filtered_link名字submit(..., filtered_link=...)

import concurrent.futures
import time

found_products = []

site_catalog = [
    "https://www.graffitishop.net/Sneakers",
    "https://www.graffitishop.net/T-shirts",
    "https://www.graffitishop.net/Sweatshirts",
    "https://www.graffitishop.net/Shirts"
]


def threading_feeds():
    print('[threading_feeds] running')
    # Create own thread for each URL as we want to run concurrent
    with concurrent.futures.ThreadPoolExecutor() as executor1:
        executor1.map(monitor_feed, site_catalog)


def monitor_feed(link: str) -> None:
    print('[monitor_feed] start')
    
    old_payload = ['old'] # product_data(...)

    # executor has to exist all time
    with concurrent.futures.ThreadPoolExecutor() as executor2:

        while True:
            print('[monitor_feed] run loop')

            new_payload = ['new1', 'new2', 'new3']  # product_data(...)
            
            if old_payload != new_payload:
                for product_link in new_payload:
                    if product_link not in found_products:
                        print(f'Detected new link -> {product_link} | From -> {link}')
                        
                        executor2.submit(filtering, filtered_link=product_link)
                        #executor2.submit(filtering, product_link)
                        
                        print("Continue")
                        
            time.sleep(2)
    
def filtering(filtered_link):
    # More code will be added in the future to handle logical code parts
    #...
    # Test
    print(f'[filtering]: start: {filtered_link}')
    time.sleep(60)
    print(f'[filtering]: end: {filtered_link}')


# --- start --

threading_feeds()

推荐阅读