首页 > 解决方案 > 如何根据解析脚本和向 postgres 插入数据来组织项目?

问题描述

我对我的项目逻辑有几个问题。我的项目基于解析来自一千个网站的数据的脚本(网站的单独脚本)。实际上我有大约 40 个脚本,但我想在将来为大规模数据组织它。抓取后,脚本有一个方法 'insert_data' 将数据(字典列表)插入到我的 POSTGRESQL 数据库中。在获取数据的脚本中,我使用库:requests、urllib、有时 selenium(带有 JS 的页面)和 BeautifulSoup4 来解析下载的数据。我的脚本的逻辑如下:

class Main():
# initialize variables

def __init__():

def get_url(self, url):
    requests.get(url)
    return soup(url)

# data_insert is list of dicts
def insert_data(self, data_insert):
    cursor.executemany("INSERT INTO table1 (url, title, etc...) VALUES (% (url)s, %(title)s, data_insert)

def __main__(self):
    # 1 looping
    for auction in auctions:
        list_auctions.append(get_href_auction)

    # 2 looping
    for row in list_auctions:
        get_url(row)
        grab some data
        record = {"var1":"value", "var2":"value" etc}
        final_list.append(record)

    # after 2 looping insert data to POSTGRESQL
    insert_data(final_list)

在数据库中,我有每个网站的表和表“data_all”。脚本将数据插入到每个网站的表中,并且在插入将这些表中的数据加载到主表“data_all”后我已经触发。我的触发器的波纹管代码:

CREATE TRIGGER trigger_insert_data_to_main_table
  AFTER INSERT
  ON data_table1
  EXECUTE PROCEDURE insert_data_to_main_table();

CREATE TRIGGER trigger_insert_data_to_main_table
  AFTER INSERT
  ON data_table2
  EXECUTE PROCEDURE insert_data_to_main_table();

等等......我所有的桌子。

CREATE OR REPLACE FUNCTION insert_data_to_main_table()
  RETURNS trigger AS
$BODY$
BEGIN
 INSERT INTO data_all
 SELECT t1.*
 FROM data_table1 t1
 LEFT JOIN data_all d ON d.url = t1.url
 WHERE d.url IS NULL
 UNION
 SELECT t2.*
 FROM data_table2 t2
 LEFT JOIN data_all d ON d.url = t2.url
 WHERE d.url IS NULL;
 RETURN NULL;
 END;
 SELECT t3.*
 FROM data_table3 t3 

等等......我所有的桌子。

这个函数让我可以忽略主表“data_all”中重复的 url(每行唯一)。

  1. 抓取脚本是一个好的逻辑吗?对于没有 JS 的页面工作正常(快速)。有时我只有 1 个循环来从主页获取一些数据(没有迭代拍卖)。
  2. 以这种方式插入数据的好方法吗?将来我必须创建一个带有抓取脚本队列的主脚本
  3. 如何保护主脚本,以便在发生错误时向我返回消息并继续工作?
  4. 我最近阅读了有关多处理的信息。在这个项目中使用它来提高效率是一个好主意吗?
  5. 有人有更好的主意从 JS 页面中获取数据吗?我根据 request.post 找到解决方案。它是python中硒的唯一替代品吗?

非常感谢您阅读完帖子,希望您能帮助我。问候 :)

标签: pythonpostgresqlqueueproject-organization

解决方案


一旦获得所需数据或某些项目出现,您可以通过策略性地停止加载页面来加速硒页面。就数据库建议而言,我使用的是 simpledb,但上次我使用它时遇到了多线程的奇怪问题,所以我基于字典和 json 编写了自己的本地简单数据库,我使用 queue.queue() q.get() 作为破解阻塞然后释放我的数据库代码部分,所以我可以在多个线程中使用

出于对所有其他数据聚合器的尊重,我建议您在对同一域的查询之间放置适当的 wait() 函数,如果您滥用网站,它们会使其他人更难。根据 IP、cookie 和 url/域名的 skip 或 wait 找出一个好的逻辑。我编写了自己的 IP 管理器来处理这个问题,我什至对 url 有最少的等待时间,尽管我认为大多数网站基本上是通过域而不是 url 检查我检查数据库中的 ip、域和特定 url,看看是否等待 X 量基于最小默认值的秒数。在提交请求之前,所有 http 请求都经过我的 ipmanager

如果你是多线程或多处理,你想划分需要最多时间来完成的函数,通常是 http 请求。db 输入等待时间最短,我不分开不同的线程,我只让某些操作一次发生一个线程,例如数据库中的“更新”功能。读取函数没有锁

这是一个名为 QStack 的类,基本上我将请求添加到返回 ID 的堆栈中,我在我的其他代码中使用此 ID 来处理响应我可以提供更多代码洞察力,如果你想合作,我试图弄清楚为什么我最近有一些代理连接问题,我尝试将验证设置为 false,并且我在请求中的 urllib3 中阅读了一些奇怪的东西

class QStack:

def getproxy(self,fp=0):
    #print 'wait for queue'
    #z=self.qq.get()
    while True:
        if fp:
            p=self.fpxo.get()
        else:
            p=self.pxo.get()
        ipo=ip(p)

        #print 'looking for ip'
        if ipo not in self.inuse:
            self.inuse.append(ipo)
            break
        time.sleep(1)

    #print 'put bob in queue'
    #self.qq.put('bob')
    return p


def start(self):
    self.ts1=time.time()
    self.qc=0
    self.hari2=1
    t = threading.Thread(
    target=self.manage_threads
            )
    t.start()




def stop(self):
    self.hari2=0

def __init__(self,ipm,pxo,
      mthreads=1,fpxo=None):

    self.fpxo=fpxo
    self.hari=1
    self.mthreads=mthreads
    self.ipm=ipm
    self.pxo=pxo

    self.rqs = Queue.Queue()
    self.rps = Queue.Queue()

    self.b = Queue.Queue()
    self.c = Queue.Queue()
    self.c.put('bob')
    self.b.put('bob')
    self.inuse=[]
    self.mc=-1
    self.athreads=0

    self.idc=-1
    self.ct=0

def manage_threads(self):
    while self.hari2:
        if self.athreads<self.mthreads:
            t = threading.Thread(
                target=self.do
            )
            t.start()
            #print 'added thread'*20
        #print 'current threads'*20,self.athreads
        time.sleep(1)

def do(self):
    if self.rqs.empty():
        return -1

    self.athreads+=1
    q=self.rqs.get()
    s = http_incognito(self.ipm)
    s.timeout=self.pxo.timeout
    hari = True

    while hari:
        #print 'hi'
        if q.ua == 'auto':
            s.incognito()
        else:
            s.useragent=q.ua

        s.cookies=q.cookies
        s.referer = q.ref

        fp=0
        self.mc+=1
        if self.mc >= self.mthreads:
            self.mc=0
            fp=1

        p = self.getproxy(fp)
        ipo=ip(p)

        q.p=p
        q.fp=fp
        #print'lalaalla'

        try:


            then=time.time()
            a=s.get(q.url,proxy=p)
            hari=0
            #print 'done'
            #ff=random.choice(xrange(1,10))
            #print 'sleep',ff
            #time.sleep(ff)


            #print'did reqeust'
        except Exception as e:
            print 'HTTP ERROR'
            print str(e)
            self.inuse.remove(ipo)

            if fp:
                self.fpxo.update(p,0)
            else:
                self.pxo.update(p,0)
            self.athreads-=1
            continue


        q.rp=a
        #print 'okayyyy'

        #no blank response
        if not q.rp:
            print 'blank response'*20
            print q.url,q.p


            if fp:
                self.fpxo.update(p,0)
            else:
                self.pxo.update(p,0)

            hari=1
            self.inuse.remove(ipo)
            continue

        if not q.rcheck(q.v()):
            print 'NONONONONONO'

            if fp:
                self.fpxo.update(p,0)
            else:
                self.pxo.update(p,0)


            print 'robot', p
            self.inuse.remove(ipo)
            hari=1
            self.athreads-=1
            continue
        #print 'horehorehore'
        #print 'passed rcheck'

    #print 'lalala'
    self.rps.put(q)
    del s  #remove
    z= q.id


    self.inuse.remove(ipo)
    self.athreads-=1
    return z

def readd(self,q):
    self.rqs.put(
       q)

def add(self,
 url,
 ref=None,
 cookies=None,
 ua='auto',
 headers=None,
 rcheck=None):





    a=self.b.get()


    self.ct +=1

    q = QQuery(url,
        ref,cookies,
         ua,
         headers,
          rcheck=rcheck)

    self.idc += 1
    q.id=self.idc
    self.rqs.put(
       q)


    self.b.put('fred')
    return q.id


def get(self,ide=None):
    a=self.c.get()

    q = self.rps.get()
    if ide != None:
        while q.id != ide:
            self.rps.put(q)
            q=self.rps.get()

    self.c.put('bob')
    return q

它像这样使用:

qinfo={}
pxo=proxymanager()
ipm=ipmanager()
fpxo=myip()

qs=QStack(10,pxo,ipm,fpxo)
for u in urllist:
    ide = qs.add(u)
    qinfo[ide]='type, more info'
qs.start()


while True:
    q=qs.get() 
    info=qinfo[q.id]
    #process query in different ways
    #based on info, id, etc
    find more urls
     qs.add(u)

推荐阅读