首页 > 解决方案 > Response payload is not completed using asyncio/aiohttp

问题描述

I've written a Python 3.7 script that asynchronously (asyncio 3.4.3 and aiohttp 3.5.4) creates a Salesforce bulk API (v45.0) job/batch using multiple objects queried by a single SOQL statement each, waits for the batches to complete, upon completion downloads (streaming) the results to a server, does some data transformations, and then finally synchronously uploads the results to SQL Server 2016 SP1 (13.0.4560.0). I have had plenty of successful trial runs with this and thought it was working perfectly, however, I've recently started intermittently receiving the following error and am kind of at a loss on how to fix as there are very few reports/solutions of this on the web:

aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed

Sample code snippet:

import asyncio,aiohttp,aiofiles
from simple_salesforce import Salesforce
from xml.etree import ElementTree

#Establish a session using the simple_salesforce module
sf = Salesforce(username=username,
                password=password,
                security_token=securityToken,
                organizationId=organizationId)
sfAPIURL = 'https://myinstance.salesforce.com/services/async/45.0/job/'
sfDataPath = 'C:/Salesforce/Data/'

#Dictionary to store information for the object/job/batch while the script is executing
objectDictionary = 
{'Account': {'job':
                {'batch': {'id': '8596P00000ihwpJulI','results': ['8596V00000Bo9iU'],'state': 'Completed'},
             'id': '8752R00000iUjtReqS'},
             'soql': 'select Id,Name from Account'},

 'Contact': {'job':
                {'batch': {'id': '9874G00000iJnBbVgg','results': ['7410t00000Ao9vp'],'state': 'Completed'},
             'id': '8800o00000POIkLlLa'},
             'soql': 'select Id,Name from Contact'}}

async def retrieveResults(jobId, batchId, sfObject):
    headers = {"X-SFDC-Session": sf.session_id, 'Content-Encoding': 'gzip'}
    async with aiohttp.ClientSession() as session:
        async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result', headers=headers) as r:
            data = await r.text()
            batchResults = ElementTree.fromstring(data) #list of batch results
            for resultID in batchResults:
                async with session.get(url=f'{sfAPIURL}{jobId}/batch/{batchId}/result/{resultID.text}', headers=headers, timeout=None) as r:
                    async with aiofiles.open(f'{sfDataPath}{sfObject}_TEMP_JOB_{jobId}_BATCH_{batchId}_RESULT_{resultID.text}.csv', 'wb') as outfile: #save in temporary file for manipulation later
                        while True:
                            chunk = await r.content.read(81920)
                            if not chunk:
                                break
                            await outfile.write(chunk)

async def asyncDownload():
    await asyncio.gather(*[retrieveResults(objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject) for sfObject in objectDictionary])

if __name__ == "__main__":
    asyncio.run(asyncDownload())

Traceback (error lines won't match code snippet above):

Traceback (most recent call last):

File "C:\Code\salesforce.py", line 252, in asyncio.run(asyncDownload())

File "C:\Program Files\Python37\lib\asyncio\runners.py", line 43, in run return loop.run_until_complete(main)

File "C:\Program Files\Python37\lib\asyncio\base_events.py", line 584, in run_until_complete return future.result()

File "C:\Code\salesforce.py", line 241, in asyncDownload await asyncio.gather(*[retrieveResults(objectDictionary[sfObject]['job']['id'], objectDictionary[sfObject]['job']['batch']['id'], sfObject) for sfObject in objectDictionary])

File "C:\Code\salesforce.py", line 183, in retrieveResults chunk = await r.content.read(81920)

File "C:\Program Files\Python37\lib\site-packages\aiohttp\streams.py", line 369, in read await self._wait('read')

File "C:\Program Files\Python37\lib\site-packages\aiohttp\streams.py", line 297, in _wait await waiter

aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed

The root of the problem seems to begin with r.content.read(81920) which should be streaming data in 81920 byte chunks but that's about as far as I can get.

I don't think this is a network issue on my end as there are other small jobs connected to external sources on this server that finish without issue while this job runs. Does anyone have any idea what is going on here?

Thank you!

-Edit:

I've tried iter_any() instead of read() and still get the same error...

async for data in r.content.iter_any():
    await outfile.write(data)

I've tried readline() and still get the same error...

async for line in r.content.readline():
    await outfile.write(line)

I have since worked in some retry functionality in the error handling piece of the code (not included in the original problem), which ultimately allows the jobs to complete. The payload errors are still happening, and that is still the main issue, but retrying the downloads has been a successful workaround. The problem still persists if anyone is able to provide further information.

标签: pythonpython-3.xasynchronoussalesforceaiohttp

解决方案


嗨,您是否尝试将 await asyncio.sleep(0) 插入:

                    ...
                    while True:
                        chunk = await r.content.read(81920)
                        await asyncio.sleep(0)
                        if not chunk:
                            break
                        await outfile.write(chunk)
                    ...

推荐阅读