首页 > 技术文章 > 抓取图片视频等资源链接地址的Python小工具

lovesqcc 2020-10-04 10:12 原文

背景

有时,我期望能够自动批量下载收藏的一些图片或视频。在 MacOS 上, you-get 可用于下载, xargs 命令可以提供批量的功能。那么,需要能够自动抓取图片、视频等资源链接地址的小工具。

“批量下载网站图片的Python实用小工具(下)” 中,编写了一个可以用于抓取和下载图片资源的小工具。本文基于这个小工具,做一点改造,来实现资源链接地址的方便抓取。


设计

资源链接规则

要实现资源链接的抓取,首先要定义资源链接规则。常见的资源链接的标签有 a, img , video 。进一步,可以通过 id, class 来精确定位所需的资源。

资源链接参数应当尽可能使用友好。可以采用 img=jpg,png;class=*;id=xyz 来定义资源的规则。在内部,会转换成 [{"img":["jpg","png"], "class":["resLink"], "id":["xyz"]}] 的更灵活的 JSON 形式。这里是或关系,也就是可以同时抓取符合多个规则中任一个的资源地址。

很多用户很可能根本不知道资源链接规则如何定义。因此,这里可以提供一个默认选项。也就是不指定该规则的话,就默认抓取 img = png or jpg 以及 a 的 href 链接。

资源链接规则参数的形式转换可以查看 res.py 的 parseRulesParam 方法。


基础组件

需要能够抓取网页内容、定位网页元素的基础组件。为了提升速度,还需要一个并发组件。基础组件都放在包 common 下面。

  • 可以使用 requests 库来抓取网页内容,见 net.py 的 getHTMLContentFromUrl 方法。不过,有些网页是动态加载的,需要等待动态加载完成才能够抓取生成的内容。这种情况下,可以使用 selenium + chromedriver 来获取网页内容。见 net.py 的 getHTMLContentAsync 方法。可以基于这两个方法做一层策略包装,见 net.py 的 getHTMLContent 方法。

  • 可以使用 BeautifulSoup 来定位资源链接元素。见 res.py 的 findWantedLinks 方法。

  • 定义一个 IoTaskThreadPool 来并发抓取网页内容,亦可用于并发下载资源。见 multitasks.py 的 IoTaskThreadPool 类。

  • 使用装饰器来捕获异常。见 common.py 的 catchExc 包装器。


小技巧

在编写基础库时,如果需要一些配置项,用参数传递的方式会比较困难,或者导致代码不太简洁。此时,可以把函数包装成类,在类的实例化参数中传入。见 net.py 的 HTMLGrasper 类。


用法

运行前置条件

需要安装 Python3 环境及 bs4 , requests, selenium 包及 pip3, chromedriver 工具。自行网搜下哈。

brew install python3
sudo easy_install pip
pip3 install requests bs4 selenium   -i  https://pypi.doubanio.com/simple

安装问题:

  • chromedriver download 下载 chromedriver.zip 并解压后,将可执行的驱动程序复制到 /usr/local/bin/ 目录下,这样就不会报权限相关问题了。

命令使用

先使用如下命令获取资源,并写入到指定结果资源文件 reslinks.txt 中。


python3 tools/res.py -u https://space.bilibili.com/183260251/favlist -r 'class=*'

然后使用如下命令来去重并下载资源。


grep 'pattern' reslinks.txt | sort | uniq | xargs -I {} you-get {}

以上两个命令可以联合起来使用。


B 站视频

python3 tools/res.py -u 'https://space.bilibili.com/183260251/favlist?fid=968614951&ftype=create'
python3 tools/res.py -u 'https://space.bilibili.com/183260251/favlist?fid=968614951&ftype=create' -r 'class=*' | grep 'video' | sort | uniq | xargs -I {} you-get {} 

黑光图集

python3 tools/res.py -u 'http://tu.heiguang.com/works/show_167521.html'
python3 tools/res.py -u 'http://tu.heiguang.com/works/show_167521.html' -r 'img=jpg!c' | sort | uniq | xargs -I {} you-get {}

源代码

包结构

具体可以下载工程: Pystudy Github。如果要修改 common 包下的方法,可以切换到 pystudy 目录下执行 sh install.sh 安装新修改后的包,然后再执行 res.py 脚本。

pystudy
   |-- common
            |-- __init.py__
            |-- common.py
            |-- multitasks.py
            |-- net.py
   |-- tools
           |-- res.py
   |-- install.sh
   |-- setup.py
   |-- __init.py__

res.py

#!/usr/bin/python3
#_*_encoding:utf-8_*_

import re
import sys
import json

import argparse
from bs4 import BeautifulSoup
from common.net import *
from common.multitasks import *

SaveResLinksFile = '/Users/qinshu/joy/reslinks.txt'
serverDomain = ''

def parseArgs():
    description = '''This program is used to batch download resources from specified urls.
                     eg. python3 res.py -u http://xxx.html -r 'img=jpg,png;class=resLink;id=xyz'
                     will search resource links from network urls http://xxx.html  by specified rules
                     img = jpg or png OR class = resLink OR id = xyz [ multiple rules ]

                     python3 tools/res.py -u 'http://tu.heiguang.com/works/show_167480.html' -r 'img=jpg!c'
                     for <img src="xxx.jpg!c"/> 
                  '''
    parser = argparse.ArgumentParser(description=description)
    parser.add_argument('-u','--url', nargs='+', help='At least one html urls are required', required=True)
    parser.add_argument('-r','--rulepath', nargs=1, help='rules to search resources. if not given, search a hrefs or img resources in given urls', required=False)
    args = parser.parse_args()
    init_urls = args.url
    rulepath = args.rulepath
    return (init_urls, rulepath)

def getAbsLink(serverDomain, link):

    try:
        href = link.attrs['href']
        if href.startswith('//'):
            return 'https:' + href
        if href.startswith('/'):
            return serverDomain + href
        else:
            return href
    except:
        return ''

def getTrueResLink(reslink):
    global serverDomain
    try:
        href = reslink.attrs['src']
        if href.startswith('//'):
            return 'http:' + href 
        if href.startswith('/'):
            href = serverDomain + href
            return href
        pos = href.find('jpg@')
        if pos == -1:
            return href
        return href[0: pos+3]
    except:
        return ''

def batchGetResTrueLink(resLinks):
    hrefs = map(getTrueResLink, resLinks)
    return filter(lambda x: x != '', hrefs)

resTags = set(['img', 'video'])

def findWantedLinks(htmlcontent, rule):
    '''
       find html links or res links from html by rule.
       sub rules such as:
          (1) a link with id=[value1,value2,...]
          (2) a link with class=[value1,value2,...]
          (3) res with src=xxx.jpg|png|mp4|...
       a rule is map containing sub rule such as:
          { 'id': [id1, id2, ..., idn] } or
          { 'class': [c1, c2, ..., cn] } or
          { 'img': ['jpg', 'png', ... ]} or
          { 'video': ['mp4', ...]}

    '''

    #print("html===\n"+htmlcontent+"\n===End")
    #print("rule===\n"+str(rule)+"\n===End")

    soup = BeautifulSoup(htmlcontent, "lxml")
    alinks = []
    reslinks = []

    for (key, values) in rule.items():
        if key == 'id':
            for id in values:
                links = soup.find_all('a', id=id)
                links = map(getTrueResLink, links)
                links = filter(lambda x: x != '', links)
                alinks.extend(links)
        elif key == 'class':
            for cls in values:
                if cls == '*':
                    links = soup.find_all('a')
                else:
                    links = soup.find_all('a', class_=cls)
                links = map(lambda link: getAbsLink(serverDomain, link), links)
                links = filter(lambda x: validate(x), links)
                alinks.extend(links)
        elif key in resTags:
            for resSuffix in values:
                reslinks.extend(soup.find_all(key, src=re.compile(resSuffix)))

    allLinks = []
    allLinks.extend(alinks)
    allLinks.extend(batchGetResTrueLink(reslinks))
    return allLinks

def validate(link):

    validSuffix = ['png', 'jpg', 'jpeg', 'mp4']

    for suf in validSuffix:
        if link.endswith(suf):
            return True
    if link == '':
        return False
    if link.endswith('html'):
        return False
    if 'javascript' in link:
        return False    
    return True    

def batchGetLinksByRule(htmlcontentList, rules):
    '''
       find all res links from html content list by rules
    '''

    links = []
    for htmlcontent in htmlcontentList:
        for rule in rules:
            links.extend(findWantedLinks(htmlcontent, rule))
    return links

def batchGetLinks(urls, rules):
    conf = {"async":1, "targetIdWhenAsync": "page-fav", "sleepWhenAsync": 10}
    grasper = HTMLGrasper(conf)
    htmlcontentList = grasper.batchGrapHtmlContents(urls)
    allLinks = batchGetLinksByRule(htmlcontentList, rules)
    with open(SaveResLinksFile, 'w') as f:
        for link in allLinks:
            print(link)
            f.write(link + "\n")

def parseRulesParam(rulesParam):
    '''
       parse rules params to rules json
       eg. img=jpg,png;class=resLink;id=xyz to
           [{"img":["jpg","png"], "class":["resLink"], "id":["xyz"]}]
    '''
    defaultRules = [{'img': ['jpg','png','jpeg']},{"class":"*"}]
    if rulesParam:
        try:
            rules = []
            rulesStrArr = rulesParam[0].split(";")
            for ruleStr in rulesStrArr:
                ruleArr = ruleStr.split("=")
                key = ruleArr[0]
                value = ruleArr[1].split(",")
                rules.append({key: value})
            return rules
        except ValueError as e:
            print('Param Error: invalid rulepath %s %s' % (rulepathjson, e))
            sys.exit(1)
    return defaultRules

def parseServerDomain(url):
    parts = url.split('/', 3)
    return parts[0] + '//' + parts[2]

def testBatchGetLinks():
    urls = ['http://dp.pconline.com.cn/list/all_t145.html']
    rules = [{"img":["jpg"], "video":["mp4"]}]

    batchGetLinks(urls, rules)

if __name__ == '__main__':

    #testBatchGetLinks()

    (init_urls, rulesParam) = parseArgs()
    print('init urls: %s' % "\n".join(init_urls))

    rulepath = parseRulesParam(rulesParam)
    serverDomain = parseServerDomain(init_urls[0])
    print('rulepath: %s\n serverDomain:%s' % (rulepath, serverDomain))

    batchGetLinks(init_urls, rulepath)

common.py

import os

def createDir(dirName):
    if not os.path.exists(dirName):
        os.makedirs(dirName)

def catchExc(func):
    def _deco(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            print ("error catch exception for %s (%s, %s): %s" % (func.__name__, str(*args), str(**kwargs), e))
            return None
    return _deco

multitasks.py

from multiprocessing import (cpu_count, Pool)
from multiprocessing.dummy import Pool as ThreadPool

ncpus = cpu_count()

def divideNParts(total, N):
    '''
       divide [0, total) into N parts:
        return [(0, total/N), (total/N, 2M/N), ((N-1)*total/N, total)]
    '''

    each = total / N
    parts = []
    for index in range(N):
        begin = index * each
        if index == N - 1:
            end = total
        else:
            end = begin + each
        parts.append((begin, end))
    return parts

class IoTaskThreadPool(object):
    '''
       thread pool for io operations
    '''
    def __init__(self, poolsize):
        self.ioPool = ThreadPool(poolsize)

    def exec(self, ioFunc, ioParams):
        if not ioParams or len(ioParams) == 0:
            return []
        return self.ioPool.map(ioFunc, ioParams)

    def execAsync(self, ioFunc, ioParams):
        if not ioParams or len(ioParams) == 0:
            return []
        self.ioPool.map_async(ioFunc, ioParams)

    def close(self):
        self.ioPool.close()

    def join(self):
        self.ioPool.join()

net.py

import requests
import time
from bs4 import BeautifulSoup
from common.common import catchExc
from common.multitasks import IoTaskThreadPool
from selenium import webdriver
from selenium.webdriver.chrome.options import Options

delayForHttpReq = 0.5 # 500ms

class HTMLGrasper(object):

    def __init__(self, conf):
        '''
        抓取 HTML 网页内容时的配置项
          _async: 是否异步加载网页。 _async = 1 当网页内容是动态生成时,异步加载网页; 
          targetIdWhenAsync: 当 _async = 1 指定。
             由于此时会加载到很多噪音内容,需要指定 ID 来精确获取所需的内容部分
          sleepWhenAsync:  当 _async = 1 指定。
             异步加载网页时需要等待的秒数  
        '''
        self._async = conf.get('async', 0)
        self.targetIdWhenAsync = conf.get('targetIdWhenAsync', '')
        self.sleepWhenAsync = conf.get('sleepWhenAsync', 10)

    def batchGrapHtmlContents(self, urls):
        '''
           batch get the html contents of urls
        '''
        grapHtmlPool = IoTaskThreadPool(20)
        return grapHtmlPool.exec(self.getHTMLContent, urls)

    def getHTMLContent(self, url):
        if self._async == 1:
            htmlContent = self.getHTMLContentAsync(url)

            if htmlContent is not None and htmlContent != '':
                html = '<html><head></head><body>' + htmlContent + '</body></html>'
                return html

        return self.getHTMLContentFromUrl(url)

    def getHTMLContentAsync(self, url):
        '''
           get html content from dynamic loaed html url
        '''

        chrome_options = Options()
        chrome_options.add_argument('--headless')
        chrome_options.add_argument('--disable-gpu')
        driver = webdriver.Chrome(chrome_options=chrome_options)
        driver.get(url)
        time.sleep(self.sleepWhenAsync)

        try:
            elem = driver.find_element_by_id(self.targetIdWhenAsync)
        except:
            elem = driver.find_element_by_xpath('/html/body')

        return elem.get_attribute('innerHTML')       

    def getHTMLContentFromUrl(self, url):
        '''
           get html content from html url
        '''
        r = requests.get(url)
        status = r.status_code
        if status != 200:
            return ''
        return r.text

setup.py

from distutils.core import setup

setup(
       name = "pystudy" ,
       version = "1.0" ,
       description = "Python Study" ,
       author = " shuqin " ,
       author_email = " shuqin_1984@163.com ",
       url = " https://github.com/shuqin/pystudy " ,
       license = " LGPL " ,
       packages = ['common']
       )

install.sh

python3 setup.py build
python3 setup.py sdist
python3 setup.py install

推荐阅读