python - 多线程应用程序中的 _wait_for_tstate_lock 错误
问题描述
我编写了一个将市场数据导入 MariaDB 数据库的 python 脚本。为了加快导入速度,我决定使用模块线程。因此,首先,一个函数会使用 url 填充一个队列,从中下载数据并将其导入我的数据库。如果脚本第一次运行,无论给定多少线程,我都没有问题,因此第一次导入可用数据。再次运行时,将检查存储的数据并在必要时进行更新。否则将被忽略。
奇怪的是,当脚本以超过 1 个线程运行并且第二次涉及更新机制时,我收到一条错误消息。
我在这个问题上苦苦挣扎了一个多星期,我可以帮助你给我一个提示可能是什么问题。
#!/usr/local/bin/python3
import requests
import queue
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import ujson
from datetime import datetime
import mysql.connector as mariadb
from mysql.connector import Error
from mysql.connector import errorcode
from threading import Thread
import time
num_threads = 4
threads = []
urls = queue.Queue()
def create_url():
try:
mariadb_connection = mariadb.connect(dbstuff)
cursor = mariadb_connection.cursor()
cursor.execute('SELECT type_id from tbl_items')
item_list = cursor.fetchall()
for row in item_list:
url = 'https://esi.evetech.net/latest/markets/10000002/orders/?datasource=tranquility&order_type=all&page=1&type_id=' + \
str(row[0])
urls.put(url)
return urls
except mariadb.Error as error:
mariadb_connection.rollback() # rollback if any exception occured
print("Failed retrieving itemtypes from tbl_items table {}".format(error))
finally:
if mariadb_connection.is_connected():
cursor.close()
mariadb_connection.close()
def import_mo_jita(i, urls):
station_id = 60003760
print("worker:", i)
try:
mariadb_connection = mariadb.connect(dbstuff)
cursor = mariadb_connection.cursor()
while (True):
url = urls.get()
print("Worker %s processes %s queue# %s" % (i, url, urls.qsize()))
if url == None:
break
s = requests.Session()
retries = Retry(total=5, backoff_factor=1, status_forcelist=[502, 503, 504])
s.mount('https://', HTTPAdapter(max_retries=retries))
jsonraw = s.get(url)
jsondata = ujson.loads(jsonraw.text)
for row in jsondata:
if (row['location_id'] == station_id):
cursor.execute('INSERT INTO tbl_mo_jita_esi_tmp (order_id) VALUES (%s)', (row['order_id'], ))
mariadb_connection.commit()
cursor.execute('SELECT order_id, price, volume FROM tbl_mo_jita WHERE order_id = %s', (row['order_id'], ))
db_data = cursor.fetchall()
#print (db_data)
if len(db_data) != 0:
for x in db_data:
db_order_id = x[0]
db_price = x[1]
db_volume = x[2]
if len(db_data) != 0:
if db_price == row['price'] and db_volume == row['volume_remain']:
continue
else:
print("updating order#", row['order_id'])
cursor.execute('UPDATE tbl_mo_jita SET volume = %s, price = %s WHERE order_id = %s', (row['volume_remain'], row['price'], row['order_id'], ))
mariadb_connection.commit()
else:
print("newly inserting order#", row['order_id'])
cursor.execute('INSERT INTO tbl_mo_jita (type_id, order_id, ordertype,volume, price) VALUES (%s,%s,%s,%s,%s)', (row['type_id'], row['order_id'], row['is_buy_order'], row['volume_remain'], row['price'], ))
mariadb_connection.commit()
else:
continue
urls.task_done()
except mariadb.Error as error:
mariadb_connection.rollback() # rollback if any exception occured
print("Failed retrieving itemtypes from tbl_items table {}".format(error))
finally:
if mariadb_connection.is_connected():
cursor.close()
mariadb_connection.close()
def cleanup_mo():
try:
mariadb_connection = mariadb.connect(dbstuff)
cursor = mariadb_connection.cursor()
cursor.execute('SELECT order_id FROM tbl_mo_jita')
list_mo_sql = cursor.fetchall()
cursor.execute('SELECT order_id FROM tbl_mo_jita_esi_tmp')
list_mo_esi = cursor.fetchall()
list_mo_purge = list(set(list_mo_sql)-set(list_mo_esi))
print(len(list_mo_purge))
for row in list_mo_purge:
cursor.execute('DELETE FROM tbl_mo_jita WHERE order_id = %s', ((row[0]), ))
cursor.execute('TRUNCATE tbl_mo_jita_esi_tmp')
mariadb_connection.commit()
except mariadb.Error as error:
mariadb_connection.rollback() # rollback if any exception occured
print("Failed retrieving itemtypes from tbl_items table {}".format(error))
finally:
if mariadb_connection.is_connected():
cursor.close()
mariadb_connection.close()
create_url()
for i in range(num_threads):
urls.put(None)
for i in range(num_threads):
worker = Thread(target=import_mo_jita, args=(i, urls,))
worker.setDaemon(True)
threads.append(worker)
worker.start()
for worker in threads:
worker.join()
cleanup_mo()
错误输出为:
Fatal Python error: Segmentation fault
Thread 0x00007f21a5079700 (most recent call first):
File "/home/gregadmin/.local/lib/python3.6/site-packages/mysql/connector/connection_cext.py", line 329 in commit
File "import_mo_jita.py", line 93 in import_mo_jita
File "/usr/lib/python3.6/threading.py", line 864 in run
File "/usr/lib/python3.6/threading.py", line 916 in _bootstrap_inner
File "/usr/lib/python3.6/threading.py", line 884 in _bootstrap
Current thread 0x00007f21a587a700 (most recent call first):
File "import_mo_jita.py", line 99 in import_mo_jita
File "/usr/lib/python3.6/threading.py", line 864 in run
File "/usr/lib/python3.6/threading.py", line 916 in _bootstrap_inner
File "/usr/lib/python3.6/threading.py", line 884 in _bootstrap
Thread 0x00007f21a607b700 (most recent call first):
File "import_mo_jita.py", line 87 in import_mo_jita
File "/usr/lib/python3.6/threading.py", line 864 in run
File "/usr/lib/python3.6/threading.py", line 916 in _bootstrap_inner
File "/usr/lib/python3.6/threading.py", line 884 in _bootstrap
Thread 0x00007f21a687c700 (most recent call first):
File "/usr/lib/python3.6/ssl.py", line 631 in read
File "/usr/lib/python3.6/ssl.py", line 874 in read
File "/usr/lib/python3.6/ssl.py", line 1012 in recv_into
File "/usr/lib/python3.6/socket.py", line 586 in readinto
File "/usr/lib/python3.6/http/client.py", line 258 in _read_status
File "/usr/lib/python3.6/http/client.py", line 297 in begin
File "/usr/lib/python3.6/http/client.py", line 1331 in getresponse
File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 383 in _make_request
File "/usr/lib/python3/dist-packages/urllib3/connectionpool.py", line 601 in urlopen
File "/usr/lib/python3/dist-packages/requests/adapters.py", line 440 in send
File "/usr/lib/python3/dist-packages/requests/sessions.py", line 630 in send
File "/usr/lib/python3/dist-packages/requests/sessions.py", line 520 in request
File "/usr/lib/python3/dist-packages/requests/sessions.py", line 533 in get
File "import_mo_jita.py", line 87 in import_mo_jita
File "/usr/lib/python3.6/threading.py", line 864 in run
File "/usr/lib/python3.6/threading.py", line 916 in _bootstrap_inner
File "/usr/lib/python3.6/threading.py", line 884 in _bootstrap
Thread 0x00007f21ad64d740 (most recent call first):
File "/usr/lib/python3.6/threading.py", line 1072 in _wait_for_tstate_lock
File "/usr/lib/python3.6/threading.py", line 1056 in join
File "import_mo_jita.py", line 187 in <module>
Segmentation fault (core dumped)
解决方案
推荐阅读
- python - 用装饰器捕获异常并继续
- loops - SPSS:使用循环计算不同时间点的新变量
- html-email - Strapi 自定义电子邮件模板
- hadoop - 在 Hadoop Mapreduce 中对输出进行排序
- css - 如何在侧面创建带有文本的轮播(非全尺寸)-如图
- r - 如何从字符串中删除这些字符 c () \ ][ ""
- c# - 如何从失败的 httpClient 调用中检索自定义错误对象?
- sql - Merge Multiple Rows to One Row having Same value swapped between 2 columns In SQL Server
- visual-studio - Visual Studio 2019 中的 Nuget 更新失败
- sql-server - SQL Update语句多次更新一行,但不一致