python - 使用异步 Python 3 的并发 HTTP 和 SQL 请求
问题描述
第一次尝试asyncio
和aiohttp
。我有以下代码urls
从MySQL
数据库中获取GET
请求。获取响应并将它们推送到MySQL
数据库。
if __name__ == "__main__":
database_name = 'db_name'
company_name = 'company_name'
my_db = Db(database=database_name) # wrapper class for mysql.connector
urls_dict = my_db.get_rest_api_urls_for_specific_company(company_name=company_name)
update_id = my_db.get_updateid()
my_db.get_connection(dictionary=True)
for url in urls_dict:
url_id = url['id']
url = url['url']
table_name = my_db.make_sql_table_name_by_url(url)
insert_query = my_db.get_sql_for_insert(table_name)
r = requests.get(url=url).json() # make the request
args = [json.dumps(r), update_id, url_id]
my_db.db_execute_one(insert_query, args, close_conn=False)
my_db.close_conn()
这工作正常,但要加快速度我该如何运行它asynchronously
?
这是我根据@Raphael Medaer 的回答尝试过的。
async def fetch(url):
async with ClientSession() as session:
async with session.request(method='GET', url=url) as response:
json = await response.json()
return json
async def process(url, update_id):
table_name = await db.make_sql_table_name_by_url(url)
result = await fetch(url)
print(url, result)
if __name__ == "__main__":
"""Get urls from DB"""
db = Db(database="fuse_src")
urls = db.get_rest_api_urls() # This returns list of dictionary
update_id = db.get_updateid()
url_list = []
for url in urls:
url_list.append(url['url'])
print(update_id)
asyncio.get_event_loop().run_until_complete(
asyncio.gather(*[process(url, update_id) for url in url_list]))
我在方法中遇到错误process
:
TypeError: object str can't be used in 'await' expression
不确定是什么问题?
任何特定于此的代码示例都将受到高度赞赏。
解决方案
使此代码异步根本不会加快速度。除非您考虑“并行”运行部分代码。例如,您可以“同时”运行多个(SQL 或 HTTP)查询。通过进行异步编程,您将不会“同时”执行代码。尽管在等待 IO 时,您将受益于长 IO 任务来执行代码的其他部分。
首先,您必须使用异步库(而不是同步库)。
mysql.connector
可以用aio-libs 中的aiomysql代替。requests
可以用aiohttp代替
要“并行”执行多个异步任务(例如替换循环for url in urls_dict:
),您必须仔细阅读asyncio tasks和 function gather
。
我不会以异步方式(重新)编写您的代码,但是这里有几行伪代码可以帮助您:
async def process(url):
result = await fetch(url)
await db.commit(result)
if __name__ == "__main__":
db = MyDbConnection()
urls = await db.fetch_all_urls()
asyncio.get_event_loop().run_until_complete(
asyncio.gather(*[process(url) for url in urls]))
推荐阅读
- android - 使用 Android 密钥库时出现 KeyNotYetValidException
- javascript - 获取基于时间戳获取谷歌图表的问题
- docker - 如果机器上没有安装的 Volumes 文件夹,Docker stack deploy 不会运行容器
- python - 与未知数量的参数相交的 Pythonic 方式
- java - Lombok 1.18.2 不适用于 maven 和 jdk 10
- java - 在查询、jpa 存储库和 Spring Boot 中使用非硬编码值
- android - 我应该选择哪个 Recyclerview 或 ScrollView?
- google-api - Google 管理帐户:API 访问所需的权限
- debugging - 设置短名称时连接到生产节点
- ios - 具有相同 bundleID 的应用程序将具有相同的沙盒?