首页 > 解决方案 > 如何使用多处理解析文本文件并返回 url

问题描述

我有一个文本文件。我需要使用 URLExtract 模块从文件中加载数据,解析 messages_to_parse 并从那里提取所有 URL(多处理) 使用 requests.head(多线程)检查每个地址的可用性。在输出中,形成一个字典,其中键是 url,值是它的状态码,它返回 requests.head 请求我将不胜感激任何帮助,请您更正我的代码吗?

import concurrent.futures
import pickle
import requests
import time
from urlextract import URLExtract

out = []
CONNECTIONS = 100
TIMEOUT = 5

with open('messages_to_parse.dat', 'rb') as f:
   tlds = str(pickle.load(f, encoding='latin1'))
extractor = URLExtract()
urls = [url for url in extractor.gen_urls(tlds)]


def load_url(url, timeout):
   ans = requests.head(url, timeout=timeout)
   return ans.status_code


print(load_url(urls, 5))

with 
 concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) 
  as executor:
    future_to_url = (executor.submit(load_url, url, TIMEOUT) for 
url in urls)
for future in concurrent.futures.as_completed(future_to_url):
    try:
        data = future.result()
    except Exception as exc:
        data = str(type(exc))
    finally:
        out.append(data)

        print(str(len(out)), end="\r")






 

标签: pythonfilemultiprocessing

解决方案


正如您现在所拥有的,它tlds是一个字符串,您不能使用该UrlExtract.gen_urls方法应用多处理来处理它。但是,如果pickle.load函数的结果实际上是tlds顾名思义的单个字符串列表,您正在使用该函数将其转换为单个字符串str,那么如果您将其保留为实际列表,则可以使用多处理来处理每个并行字符串(尽管它可能不会为您节省任何计算时间,具体取决于计算的 CPU 密集程度)。基于pickle.load返回字符串列表的假设,我提供以下可能的解决方案和一些注意事项:

  1. 如果您打算将一条长的 Python 语句拆分成多行,那么您需要通过在行\尾添加一个字符来将语句继续到下一行。
  2. 没有必要创建大于urls列表大小的线程池 if len(urls) < CONNECTIONS
  3. 变量名future_to_url表明这是从未来到 URL 的映射,因此给定未来,您可以恢复为其创建未来的 URL。这将要求此变量是字典。
  4. 您的as_completed循环应该在with ... as executor:块内。在块之后拥有它意味着它在隐式调用之后才会执行executor.shutdown(wait=True),因此所有未决的期货都已完成执行,这违背了使用的整个目的as_completed

编码:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, \
                               as_completed
from multiprocessing import cpu_count
import pickle
import requests
import time
from urlextract import URLExtract

CONNECTIONS = 100
TIMEOUT = 5

def load_url(url, timeout):
    ans = requests.head(url, timeout=timeout)
    return ans.status_code

def extract_urls(text):
    extractor = URLExtract()
    return extractor.gen_urls(text)
    
def main():
    with open('messages_to_parse.dat', 'rb') as f:
        # create a list:
        tlds = pickle.load(f, encoding='latin1')

    urls = []
    with ProcessPoolExecutor(min(cpu_count(), len(tlds))) as executor:
        futures = [executor.submit(extract_urls, tld) for tld in tlds]
        for future in as_completed(futures):
            urls.extend(future.result())

    with ThreadPoolExecutor(min(CONNECTIONS, len(urls))) as executor:
        future_to_url = {executor.submit(load_url, url, TIMEOUT): url for 
                         url in urls}
        out = []
        for future in as_completed(future_to_url):
            try:
                data = future.result()
            except Exception as exc:
                data = str(type(exc))
            finally:
                url = future_to_url[future]
                out.append((url, data))
                print(len(out), end="\r")
    print()
    for url, data in out:
        print(url, data)

# Required for multiprocessing on Windows:
if __name__ == '__main__':
    main()

下面的版本不是先获取所有 URL,然后对它们执行 HEAD 请求,而是在方法返回 URL 时尝试执行 HEAD 请求URLExtract.gen_urls。性能可能会略有提高:

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, \
                               as_completed
from multiprocessing import cpu_count
import pickle
import requests
import time
from urlextract import URLExtract

CONNECTIONS = 100
TIMEOUT = 5

def load_url(url, timeout):
    ans = requests.head(url, timeout=timeout)
    return ans.status_code

def extract_urls(text):
    extractor = URLExtract()
    return extractor.gen_urls(text)
    
def main():
    with open('messages_to_parse.dat', 'rb') as f:
        # create a list:
        tlds = pickle.load(f, encoding='latin1')

    with ThreadPoolExecutor(CONNECTIONS) as thread_executor:
        future_to_url = {}
        with ProcessPoolExecutor(min(cpu_count(), len(tlds))) as process_executor:
            futures = [process_executor.submit(extract_urls, tld) for tld in tlds]
            for future in as_completed(futures):
                for url in future.result():
                    future_to_url[thread_executor.submit(load_url, url, TIMEOUT)] = url
        out = []
        for future in as_completed(future_to_url):
            try:
                data = future.result()
            except Exception as exc:
                data = str(type(exc))
            finally:
                url = future_to_url[future]
                out.append((url, data))
                print(len(out), end="\r")
    print()
    for url, data in out:
        print(url, data)

# Required for multiprocessing on Windows:
if __name__ == '__main__':
    main()

推荐阅读