首页 > 解决方案 > 如何在 Python 中检查 URL 的响应并存储在 DB 中?

问题描述

假设我有大约 100,000 个或更多的 URL,我需要点击它们并检查响应代码。

下面的方法适用于这么大的数字还是有其他有效的方法?

1- 导入请求 2- response = request.get(' https://google.com ') 3- print(response)

我还有一个数据库表-

在此处输入图像描述

我想将测试的 URL 存储到该表中。同样,我的观点是,对于 100,000 个 URL 甚至更多的有效方法是什么。

标签: pythonsqlpython-3.x

解决方案


这个问题可能会永远丢失,但它启发了我编写一个简单的蜘蛛。
它使用 sqlite3 及其多线程。

setup_spider.py(运行一次)

import sqlite3
db_name = "spider.db"

# create sqlite3 db
conn = sqlite3.connect(db_name)
c = conn.cursor() # The database will be saved in the location where your 'py' file is

# create the spider table if it doesn't exist
c.execute('''create table if not exists spider ([url] text PRIMARY KEY, [status] integer, [dt] date) ''')

# insert all urls to DB
# The folowing assumes the urls are on a text file one per line
with open("top100.txt") as f:  # https://gist.github.com/demersdesigns/4442cd84c1cc6c5ccda9b19eac1ba52b
    for url in f:
        c.execute(f""" INSERT OR IGNORE into spider (url) VALUES('{url.strip()}') """);

conn.commit()
conn.close()
print("Your spider was setup succesfully")

蜘蛛.py

import requests
import sqlite3
import time
from  datetime import datetime
import threading
import traceback

# spider settings
db_name = "spider.db" # db name
spider_name = "Stack Spider" # spider name
spider_max_threads = 15 # max parallel threads. More threads doesn't mean faster crawling, adjust according to your OS and Network
spider_sleep = 0.05 # sleep time when max threads is reached
spider_allow_redirects = 1 # allow spider redirects - 0 to disable
spider_timeout = 5  # request timeout in secs
spider_ua = f"{spider_name} Crawler Bot" # spider User-Agent
spider_headers = {"User-Agent": spider_ua} # request headers
log_level = 0 # 0 disabled, 1 minimal, 2 errors

# insert on DB
def update_status(url, status, dt):
    conn = sqlite3.connect(db_name)
    c = conn.cursor()
    c.execute(f""" INSERT or REPLACE into spider (url, status, dt) VALUES('{url}', '{status}', '{dt}') ; """);
    conn.commit()
    conn.close()

# crawler
def crawl(url):
    dt  = datetime.now()
    try:
        if log_level == 1: print(f"Crawling {url}")
        res = requests.head(url, allow_redirects=spider_allow_redirects, timeout=spider_timeout, headers=spider_headers)
        status = res.status_code
        update_status(url, status, dt)
        if log_level == 1: print(f"{url} {status}")
    except:
        pass
        update_status(url, 0, dt) # set status 0 if there was an error on the request (too many redirect, timeouts, etc)
        if log_level > 1: print("Error: ", traceback.print_exc())

# count spider threads
def count_threads():
    n = 0
    for x in threading.enumerate():
        if "spider" == x.getName():
            n+=1
    return n


# get all urls from DB
conn = sqlite3.connect(db_name)
c = conn.cursor()
c.execute('SELECT url FROM spider') # can be changed to filter the websites to crawl, eg: first 1000 urls or google and fb only, etc...
urls = c.fetchall()
conn.close()

# main loop
start = time.time()
print(f"{spider_name} Started. Crawling {len(urls)} urls")
for url in urls:
    threading.Thread(target=crawl, args=[url[0]], name="spider").start()
    while count_threads() >= spider_max_threads:
        time.sleep(spider_sleep)

# wait threads to finish
while count_threads():
    time.sleep(spider_sleep)
end = time.time()

print(f"{spider_name} Finished in {int(end - start)} seconds")


推荐阅读