python - 如何根据解析脚本和向 postgres 插入数据来组织项目?
问题描述
我对我的项目逻辑有几个问题。我的项目基于解析来自一千个网站的数据的脚本(网站的单独脚本)。实际上我有大约 40 个脚本,但我想在将来为大规模数据组织它。抓取后,脚本有一个方法 'insert_data' 将数据(字典列表)插入到我的 POSTGRESQL 数据库中。在获取数据的脚本中,我使用库:requests、urllib、有时 selenium(带有 JS 的页面)和 BeautifulSoup4 来解析下载的数据。我的脚本的逻辑如下:
class Main():
# initialize variables
def __init__():
def get_url(self, url):
requests.get(url)
return soup(url)
# data_insert is list of dicts
def insert_data(self, data_insert):
cursor.executemany("INSERT INTO table1 (url, title, etc...) VALUES (% (url)s, %(title)s, data_insert)
def __main__(self):
# 1 looping
for auction in auctions:
list_auctions.append(get_href_auction)
# 2 looping
for row in list_auctions:
get_url(row)
grab some data
record = {"var1":"value", "var2":"value" etc}
final_list.append(record)
# after 2 looping insert data to POSTGRESQL
insert_data(final_list)
在数据库中,我有每个网站的表和表“data_all”。脚本将数据插入到每个网站的表中,并且在插入将这些表中的数据加载到主表“data_all”后我已经触发。我的触发器的波纹管代码:
CREATE TRIGGER trigger_insert_data_to_main_table
AFTER INSERT
ON data_table1
EXECUTE PROCEDURE insert_data_to_main_table();
CREATE TRIGGER trigger_insert_data_to_main_table
AFTER INSERT
ON data_table2
EXECUTE PROCEDURE insert_data_to_main_table();
等等......我所有的桌子。
CREATE OR REPLACE FUNCTION insert_data_to_main_table()
RETURNS trigger AS
$BODY$
BEGIN
INSERT INTO data_all
SELECT t1.*
FROM data_table1 t1
LEFT JOIN data_all d ON d.url = t1.url
WHERE d.url IS NULL
UNION
SELECT t2.*
FROM data_table2 t2
LEFT JOIN data_all d ON d.url = t2.url
WHERE d.url IS NULL;
RETURN NULL;
END;
SELECT t3.*
FROM data_table3 t3
等等......我所有的桌子。
这个函数让我可以忽略主表“data_all”中重复的 url(每行唯一)。
- 抓取脚本是一个好的逻辑吗?对于没有 JS 的页面工作正常(快速)。有时我只有 1 个循环来从主页获取一些数据(没有迭代拍卖)。
- 以这种方式插入数据的好方法吗?将来我必须创建一个带有抓取脚本队列的主脚本
- 如何保护主脚本,以便在发生错误时向我返回消息并继续工作?
- 我最近阅读了有关多处理的信息。在这个项目中使用它来提高效率是一个好主意吗?
- 有人有更好的主意从 JS 页面中获取数据吗?我根据 request.post 找到解决方案。它是python中硒的唯一替代品吗?
非常感谢您阅读完帖子,希望您能帮助我。问候 :)
解决方案
一旦获得所需数据或某些项目出现,您可以通过策略性地停止加载页面来加速硒页面。就数据库建议而言,我使用的是 simpledb,但上次我使用它时遇到了多线程的奇怪问题,所以我基于字典和 json 编写了自己的本地简单数据库,我使用 queue.queue() q.get() 作为破解阻塞然后释放我的数据库代码部分,所以我可以在多个线程中使用
出于对所有其他数据聚合器的尊重,我建议您在对同一域的查询之间放置适当的 wait() 函数,如果您滥用网站,它们会使其他人更难。根据 IP、cookie 和 url/域名的 skip 或 wait 找出一个好的逻辑。我编写了自己的 IP 管理器来处理这个问题,我什至对 url 有最少的等待时间,尽管我认为大多数网站基本上是通过域而不是 url 检查我检查数据库中的 ip、域和特定 url,看看是否等待 X 量基于最小默认值的秒数。在提交请求之前,所有 http 请求都经过我的 ipmanager
如果你是多线程或多处理,你想划分需要最多时间来完成的函数,通常是 http 请求。db 输入等待时间最短,我不分开不同的线程,我只让某些操作一次发生一个线程,例如数据库中的“更新”功能。读取函数没有锁
这是一个名为 QStack 的类,基本上我将请求添加到返回 ID 的堆栈中,我在我的其他代码中使用此 ID 来处理响应我可以提供更多代码洞察力,如果你想合作,我试图弄清楚为什么我最近有一些代理连接问题,我尝试将验证设置为 false,并且我在请求中的 urllib3 中阅读了一些奇怪的东西
class QStack:
def getproxy(self,fp=0):
#print 'wait for queue'
#z=self.qq.get()
while True:
if fp:
p=self.fpxo.get()
else:
p=self.pxo.get()
ipo=ip(p)
#print 'looking for ip'
if ipo not in self.inuse:
self.inuse.append(ipo)
break
time.sleep(1)
#print 'put bob in queue'
#self.qq.put('bob')
return p
def start(self):
self.ts1=time.time()
self.qc=0
self.hari2=1
t = threading.Thread(
target=self.manage_threads
)
t.start()
def stop(self):
self.hari2=0
def __init__(self,ipm,pxo,
mthreads=1,fpxo=None):
self.fpxo=fpxo
self.hari=1
self.mthreads=mthreads
self.ipm=ipm
self.pxo=pxo
self.rqs = Queue.Queue()
self.rps = Queue.Queue()
self.b = Queue.Queue()
self.c = Queue.Queue()
self.c.put('bob')
self.b.put('bob')
self.inuse=[]
self.mc=-1
self.athreads=0
self.idc=-1
self.ct=0
def manage_threads(self):
while self.hari2:
if self.athreads<self.mthreads:
t = threading.Thread(
target=self.do
)
t.start()
#print 'added thread'*20
#print 'current threads'*20,self.athreads
time.sleep(1)
def do(self):
if self.rqs.empty():
return -1
self.athreads+=1
q=self.rqs.get()
s = http_incognito(self.ipm)
s.timeout=self.pxo.timeout
hari = True
while hari:
#print 'hi'
if q.ua == 'auto':
s.incognito()
else:
s.useragent=q.ua
s.cookies=q.cookies
s.referer = q.ref
fp=0
self.mc+=1
if self.mc >= self.mthreads:
self.mc=0
fp=1
p = self.getproxy(fp)
ipo=ip(p)
q.p=p
q.fp=fp
#print'lalaalla'
try:
then=time.time()
a=s.get(q.url,proxy=p)
hari=0
#print 'done'
#ff=random.choice(xrange(1,10))
#print 'sleep',ff
#time.sleep(ff)
#print'did reqeust'
except Exception as e:
print 'HTTP ERROR'
print str(e)
self.inuse.remove(ipo)
if fp:
self.fpxo.update(p,0)
else:
self.pxo.update(p,0)
self.athreads-=1
continue
q.rp=a
#print 'okayyyy'
#no blank response
if not q.rp:
print 'blank response'*20
print q.url,q.p
if fp:
self.fpxo.update(p,0)
else:
self.pxo.update(p,0)
hari=1
self.inuse.remove(ipo)
continue
if not q.rcheck(q.v()):
print 'NONONONONONO'
if fp:
self.fpxo.update(p,0)
else:
self.pxo.update(p,0)
print 'robot', p
self.inuse.remove(ipo)
hari=1
self.athreads-=1
continue
#print 'horehorehore'
#print 'passed rcheck'
#print 'lalala'
self.rps.put(q)
del s #remove
z= q.id
self.inuse.remove(ipo)
self.athreads-=1
return z
def readd(self,q):
self.rqs.put(
q)
def add(self,
url,
ref=None,
cookies=None,
ua='auto',
headers=None,
rcheck=None):
a=self.b.get()
self.ct +=1
q = QQuery(url,
ref,cookies,
ua,
headers,
rcheck=rcheck)
self.idc += 1
q.id=self.idc
self.rqs.put(
q)
self.b.put('fred')
return q.id
def get(self,ide=None):
a=self.c.get()
q = self.rps.get()
if ide != None:
while q.id != ide:
self.rps.put(q)
q=self.rps.get()
self.c.put('bob')
return q
它像这样使用:
qinfo={}
pxo=proxymanager()
ipm=ipmanager()
fpxo=myip()
qs=QStack(10,pxo,ipm,fpxo)
for u in urllist:
ide = qs.add(u)
qinfo[ide]='type, more info'
qs.start()
while True:
q=qs.get()
info=qinfo[q.id]
#process query in different ways
#based on info, id, etc
find more urls
qs.add(u)
推荐阅读
- azure-devops - 使用模板从主 yml 文件调用 step 变量
- javascript - 如何使用 Jest 模拟和测试时刻日期格式化程序
- sql - 带有 SQL 的 XML 节点
- python - AttributeError:部分初始化的模块'cv2'没有属性'VideoCapture' w/安装了opencv-contrib-python
- reactjs - npm 错误!代码 ERESOLVE 无法解析依赖树
- python - 如何快速将数据从 CFFI 复制到 Numpy?
- python - 通过 Jira REST API 从 CustomFieldOption 数组中获取“值”
- java - 如何在 Java 中创建具有排序字符串和特定整数的列表?
- javascript - 如何重置为空数组
- prometheus - Grafana 在非全屏时显示“无数据”