python - 如何使用多处理解析文本文件并返回 url
问题描述
我有一个文本文件。我需要使用 URLExtract 模块从文件中加载数据,解析 messages_to_parse 并从那里提取所有 URL(多处理) 使用 requests.head(多线程)检查每个地址的可用性。在输出中,形成一个字典,其中键是 url,值是它的状态码,它返回 requests.head 请求我将不胜感激任何帮助,请您更正我的代码吗?
import concurrent.futures
import pickle
import requests
import time
from urlextract import URLExtract
out = []
CONNECTIONS = 100
TIMEOUT = 5
with open('messages_to_parse.dat', 'rb') as f:
tlds = str(pickle.load(f, encoding='latin1'))
extractor = URLExtract()
urls = [url for url in extractor.gen_urls(tlds)]
def load_url(url, timeout):
ans = requests.head(url, timeout=timeout)
return ans.status_code
print(load_url(urls, 5))
with
concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS)
as executor:
future_to_url = (executor.submit(load_url, url, TIMEOUT) for
url in urls)
for future in concurrent.futures.as_completed(future_to_url):
try:
data = future.result()
except Exception as exc:
data = str(type(exc))
finally:
out.append(data)
print(str(len(out)), end="\r")
解决方案
正如您现在所拥有的,它tlds
是一个字符串,您不能使用该UrlExtract.gen_urls
方法应用多处理来处理它。但是,如果pickle.load
函数的结果实际上是tlds
顾名思义的单个字符串列表,您正在使用该函数将其转换为单个字符串str
,那么如果您将其保留为实际列表,则可以使用多处理来处理每个并行字符串(尽管它可能不会为您节省任何计算时间,具体取决于计算的 CPU 密集程度)。基于pickle.load
返回字符串列表的假设,我提供以下可能的解决方案和一些注意事项:
- 如果您打算将一条长的 Python 语句拆分成多行,那么您需要通过在行
\
尾添加一个字符来将语句继续到下一行。 - 没有必要创建大于
urls
列表大小的线程池 iflen(urls) < CONNECTIONS
。 - 变量名
future_to_url
表明这是从未来到 URL 的映射,因此给定未来,您可以恢复为其创建未来的 URL。这将要求此变量是字典。 - 您的
as_completed
循环应该在with ... as executor:
块内。在块之后拥有它意味着它在隐式调用之后才会执行executor.shutdown(wait=True)
,因此所有未决的期货都已完成执行,这违背了使用的整个目的as_completed
。
编码:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, \
as_completed
from multiprocessing import cpu_count
import pickle
import requests
import time
from urlextract import URLExtract
CONNECTIONS = 100
TIMEOUT = 5
def load_url(url, timeout):
ans = requests.head(url, timeout=timeout)
return ans.status_code
def extract_urls(text):
extractor = URLExtract()
return extractor.gen_urls(text)
def main():
with open('messages_to_parse.dat', 'rb') as f:
# create a list:
tlds = pickle.load(f, encoding='latin1')
urls = []
with ProcessPoolExecutor(min(cpu_count(), len(tlds))) as executor:
futures = [executor.submit(extract_urls, tld) for tld in tlds]
for future in as_completed(futures):
urls.extend(future.result())
with ThreadPoolExecutor(min(CONNECTIONS, len(urls))) as executor:
future_to_url = {executor.submit(load_url, url, TIMEOUT): url for
url in urls}
out = []
for future in as_completed(future_to_url):
try:
data = future.result()
except Exception as exc:
data = str(type(exc))
finally:
url = future_to_url[future]
out.append((url, data))
print(len(out), end="\r")
print()
for url, data in out:
print(url, data)
# Required for multiprocessing on Windows:
if __name__ == '__main__':
main()
下面的版本不是先获取所有 URL,然后对它们执行 HEAD 请求,而是在方法返回 URL 时尝试执行 HEAD 请求URLExtract.gen_urls
。性能可能会略有提高:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, \
as_completed
from multiprocessing import cpu_count
import pickle
import requests
import time
from urlextract import URLExtract
CONNECTIONS = 100
TIMEOUT = 5
def load_url(url, timeout):
ans = requests.head(url, timeout=timeout)
return ans.status_code
def extract_urls(text):
extractor = URLExtract()
return extractor.gen_urls(text)
def main():
with open('messages_to_parse.dat', 'rb') as f:
# create a list:
tlds = pickle.load(f, encoding='latin1')
with ThreadPoolExecutor(CONNECTIONS) as thread_executor:
future_to_url = {}
with ProcessPoolExecutor(min(cpu_count(), len(tlds))) as process_executor:
futures = [process_executor.submit(extract_urls, tld) for tld in tlds]
for future in as_completed(futures):
for url in future.result():
future_to_url[thread_executor.submit(load_url, url, TIMEOUT)] = url
out = []
for future in as_completed(future_to_url):
try:
data = future.result()
except Exception as exc:
data = str(type(exc))
finally:
url = future_to_url[future]
out.append((url, data))
print(len(out), end="\r")
print()
for url, data in out:
print(url, data)
# Required for multiprocessing on Windows:
if __name__ == '__main__':
main()
推荐阅读
- java - 无法在 Eclipse 中解析类型 Spring 项目
- java - 对列表中的每个元素运行 jUnit 测试
- ios - iOS 上的 CoreText FindColorBitmapForGlyph 崩溃
- javascript - 如何使用 jest 模拟外部类?
- c# - 调用 QueryableExtensions 方法时可以使用字符串来引用 TableName 吗?
- python - 文件中最后一行的 Bash 浮点计算
- arrays - Applying a threshold to all values in an array
- javascript - Service Worker fetch 事件没有被触发
- android-fragments - Xamarin Android Fragment Inflation, Binary XML file line #1: Binary XML file line #1: Error inflating class fragment
- git - Visual Studio Code、Windows、Git 客户端(版本 2.20.1)无法将代码推送到 GitHub 503 错误