python - Response payload is not completed using asyncio/aiohttp
问题描述
I've written a Python 3.7
script that asynchronously (asyncio 3.4.3 and aiohttp 3.5.4)
creates a Salesforce
bulk API (v45.0)
job/batch using multiple objects queried by a single SOQL
statement each, waits for the batches to complete, upon completion downloads (streaming) the results to a server, does some data transformations, and then finally synchronously uploads the results to SQL Server 2016 SP1 (13.0.4560.0)
. I have had plenty of successful trial runs with this and thought it was working perfectly, however, I've recently started intermittently receiving the following error and am kind of at a loss on how to fix as there are very few reports/solutions of this on the web:
aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed
Sample code snippet:
import asyncio,aiohttp,aiofiles
from simple_salesforce import Salesforce
from xml.etree import ElementTree
#Establish a session using the simple_salesforce module
sf = Salesforce(username=username,
password=password,
security_token=securityToken,
organizationId=organizationId)
sfAPIURL = 'https://myinstance.salesforce.com/services/async/45.0/job/'
sfDataPath = 'C:/Salesforce/Data/'
#Dictionary to store information for the object/job/batch while the script is executing
objectDictionary =
{'Account': {'job':
{'batch': {'id': '8596P00000ihwpJulI','results': ['8596V00000Bo9iU'],'state': 'Completed'},
'id': '8752R00000iUjtReqS'},
'soql': 'select Id,Name from Account'},
'Contact': {'job':
{'batch': {'id': '9874G00000iJnBbVgg','results': ['7410t00000Ao9vp'],'state': 'Completed'},
'id': '8800o00000POIkLlLa'},
'soql': 'select Id,Name from Contact'}}
async def retrieveResults(jobId, batchId, sfObject):
headers = {"X-SFDC-Session": sf.session_id, 'Content-Encoding': 'gzip'}
async with aiohttp.ClientSession() as session:
async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result', headers=headers) as r:
data = await r.text()
batchResults = ElementTree.fromstring(data) #list of batch results
for resultID in batchResults:
async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result/{resultID.text}', headers=headers, timeout=None) as r:
async with aiofiles.open(f'{sfDataPath}{sfObject}_TEMP_JOB_{jobId}_BATCH_{batchId}_RESULT_{resultID.text}.csv', 'wb') as outfile: #save in temporary file for manipulation later
while True:
chunk = await r.content.read(81920)
if not chunk:
break
await outfile.write(chunk)
async def asyncDownload():
await asyncio.gather(*[retrieveResults(objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject) for sfObject in objectDictionary])
if __name__ == "__main__":
asyncio.run(asyncDownload())
Traceback (error lines won't match code snippet above):
Traceback (most recent call last):
File "C:\Code\salesforce.py", line 252, in asyncio.run(asyncDownload())
File "C:\Program Files\Python37\lib\asyncio\runners.py", line 43, in run return loop.run_until_complete(main)
File "C:\Program Files\Python37\lib\asyncio\base_events.py", line 584, in run_until_complete return future.result()
File "C:\Code\salesforce.py", line 241, in asyncDownload await asyncio.gather(*[retrieveResults(objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject) for sfObject in objectDictionary])
File "C:\Code\salesforce.py", line 183, in retrieveResults chunk = await r.content.read(81920)
File "C:\Program Files\Python37\lib\site-packages\aiohttp\streams.py", line 369, in read await self._wait('read')
File "C:\Program Files\Python37\lib\site-packages\aiohttp\streams.py", line 297, in _wait await waiter
aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed
The root of the problem seems to begin with r.content.read(81920)
which should be streaming data in 81920 byte chunks but that's about as far as I can get.
I don't think this is a network issue on my end as there are other small jobs connected to external sources on this server that finish without issue while this job runs. Does anyone have any idea what is going on here?
Thank you!
-Edit:
I've tried iter_any()
instead of read()
and still get the same error...
async for data in r.content.iter_any():
await outfile.write(data)
I've tried readline()
and still get the same error...
async for line in r.content.readline():
await outfile.write(line)
I have since worked in some retry functionality in the error handling piece of the code (not included in the original problem), which ultimately allows the jobs to complete. The payload errors are still happening, and that is still the main issue, but retrying the downloads has been a successful workaround. The problem still persists if anyone is able to provide further information.
解决方案
嗨,您是否尝试将 await asyncio.sleep(0) 插入:
...
while True:
chunk = await r.content.read(81920)
await asyncio.sleep(0)
if not chunk:
break
await outfile.write(chunk)
...
推荐阅读
- javascript - 我在 nuxt.js 中找不到 process.server
- javascript - 全局对象(窗口)究竟是如何工作的?
- java - 如何只迭代List中的一个值
- listbox - ZK 列表框对多列进行排序
- linux - 获取调用默认网络浏览器的进程的链接和PID
- conditional - Visual Studio Online 发布任务自定义条件
- php - 使用 Amazon API Gateway 对 API 进行身份验证
- c++ - 在不初始化向量的情况下将向量元素与字符串元素进行比较
- python - 如何在异步模式下使用 python-flask 发送大文件?
- shopify - 根据页面为 shopify-section 分配不同的类