python - 等待 asyncio.gather() 挂起
问题描述
我正在编写一个脚本,它将向 API 端点发送超过 1M 行的 CSV 数据,我希望尽可能多地异步执行此操作。我正在使用的 API 的速率限制为每分钟 100K 请求。
问题:
当脚本到达await asyncio.gather(*tasks)
- 它似乎只是挂起。我已经坐下来等了 5 分钟,然后……什么都没有。
问题:
- 为什么
await asyncio.gather()
挂了这么久? - 我是在尝试将太多请求推送到单个
await asyncio.gather()
语句中,还是应该能够毫无问题地处理此卷? - 我正在尝试将
asyncio_throttle
每分钟的请求数保持在 100K 以下。这是一个好方法吗? - 有没有我应该考虑的更有效的方法来做到这一点?
感谢您的帮助!
import os
import csv
import asyncio
import aiohttp
from asyncio_throttle import Throttler
import json
from time import sleep
'''
500 unique timestamps
1322501 lines
timeStamp, appName, hostName, cpu_idle, mem_util, txnCount, errCount
[{
"common": {
"key": "value"
},
"metrics": [{
"key": "value"
}]
}]
'''
inputFile = os.environ['FILENAME']
apiKey = os.environ['API_KEY']
url = os.environ['API_ENDPOINT']
metrics = {'cpu_idle':3 ,'mem_util': 4,'transaction_count': 5,'error_count': 6}
async def sendIt(session, payload, throttler, line_count):
d = [payload]
headers = {'Content-Type': 'application/json','Api-Key': apiKey}
async with throttler:
async with session.post(url, data = json.dumps(d), headers = headers) as response:
json_response = response
print(str(line_count) + ' --- ' + str(json_response))
async def createPayload(row):
payload = {}
common = {}
measurements = {}
metricslist = []
common['app.name'] = row[1]
common['host.name'] = row[2]
for k,v in metrics.items():
measurements['name'] = k
measurements['type'] = 'gauge'
measurements['value'] = float(row[int(v)])
metricslist.append(measurements)
measurements = {}
payload['common'] = {'timestamp': int(row[0]), 'attributes': common}
payload['metrics'] = metricslist
return payload
async def main():
tasks = []
throttler = Throttler(rate_limit=1500, period=1)
conn = aiohttp.TCPConnector(limit=1500)
with open(inputFile) as f:
csv_reader = csv.reader(f, delimiter=',')
next(csv_reader)
async with aiohttp.ClientSession(connector=conn) as session:
line_count = 0
for row in csv_reader:
line_count += 1
print('Appending row ' + str(line_count))
tasks.append(sendIt(session, await createPayload(row), throttler, line_count))
print('Here we go...')
await asyncio.gather(*tasks)
print('Complete!')
f.close()
asyncio.run(main())
解决方案
推荐阅读
- python - Django 3.x - 自定义错误页面在应该是 404 时显示 500
- javascript - 未按下时,v-bottom-navigation 中的 Vuetify v-btn 保持活动状态
- javascript - jQuery addClass 在 Android 上运行的 Chrome 上不起作用
- ios - SwiftUI ObservedObject 每次都会重新启动
- html - 锚标记在带有 :before 伪元素的 div 中不起作用
- typescript - 检查类型是否为未知类型
- python - Django 无法在数据库中保存多选字段(复选框)数据
- arrays - How to define PropTypes for Array
- arrays - Angular - 从对象数组中获取数据
- python - 从 2D df 中提取并在 python 中将值添加到 1D df