api - 使用 Spark 和 pyspark 的并行 API 请求
问题描述
我正在使用 EMR 集群执行以下操作以执行 API 请求:
def get_normal_objects(self,object_name, get_id, chunk_size=35,**params):
contacts_pages = []
batch = 0
while True:
urls = ["{}/{}".format(self.base_url, "{}?page={}&api_key={}".format(object_name, page_number, self.api_keys))
for page_number in range(batch * chunk_size + 1, chunk_size * (1 + batch) + 1)]
responses_raw = self.get_responses(urls, self.office_token, chunk_size)
LOGGER.info("Collecting data for {} for batch {}".format(object_name, batch))
try:
responses_json = [json.loads(response_raw['output']) for response_raw in responses_raw]
当我提取不需要 id 的简单对象时,代码可以正常工作,但是当它尝试提取首先需要 id 来访问 API 的复杂关系对象时实际上需要很多时间:
"https://integrations.mydesktop.com.au/api/v1.2/properties/22028014/sales?api_key"
def get_complex_objects(self,object_name_1, object_name_2, ids,spark, chunk_size=30, **params):
results = []
batch = 0
while True:
ids_start = batch * chunk_size + 1
ids_end = chunk_size * (1 + batch) + 1
chunk_ids = [ids[i] for i in range(ids_start, ids_end) if i < len(ids)]
urls = [
"{}/{}".format(self.base_url, "{}/{}/{}?api_key={}".format(object_name_1, contactId, object_name_2, self.api_keys))
for contactId in chunk_ids]
LOGGER.info("Collecting data for {}:{} for batch {}".format(object_name_1, object_name_2, batch))
responses_raw = self.get_responses(urls, self.office_token, chunk_size)
我正在使用以下 get_response 函数来获取响应:
def get_responses(self, urls, office_token, chunk_size=30, **params):
"""Calls all the urls in parallel in bathes of {chuck_size}
Arguments:
urls {List} -- list of all urls to call
office_token {String} -- Office token
Keyword Arguments:
chunk_size {int} -- nuber of parallel api calls (default: {30})
Returns:
[type] -- [description]
"""
for chunk in list(mydesktop.chunks(urls, chunk_size)):
loop = asyncio.get_event_loop()
future = asyncio.ensure_future(self.__run(params, office_token, chunk))
responses = loop.run_until_complete(future)
return responses
async def __fetch(self, url, params, session):
try:
async with session.get(url, params=params) as response:
#print('X-RateLimit-Remaining:{0}'.format(response.headers['X-RateLimit-Remaining']))
output = await response.read()
return output
except asyncio.TimeoutError as e:
print(str(e))
return None
async def __bound_fetch(self, sem, url, params, session):
# Getter function with semaphore.
async with sem:
output = await self.__fetch(url, params, session)
return {"url": url, "output":output}
async def __run(self, params, auth_user, urls):
tasks = []
sem = asyncio.Semaphore(400)
async with ClientSession(auth=BasicAuth(auth_user, password='', ),
connector=TCPConnector(ssl=False)) as session:
for url in urls:
task = asyncio.ensure_future(self.__bound_fetch(sem, url, params, session))
tasks.append(task)
responses = await asyncio.gather(*tasks)
return responses
我的问题是如何利用 Spark 的并行特性并将 URL 分发到执行程序中,以便减少提取时间?
正在考虑使用 urls = spark.sparkContext.parallelize(urls) 将 URL 发送给执行程序,然后使用 map lambda 来执行获取请求。
解决方案
推荐阅读
- python - Datetime 天数的子集
- apache - SameSite=None 不适用于 Apache 2.2.15 和 Tomcat 6
- c# - 查找范围之间的月份
- node.js - 无法从 localhost 获取数据
- php - Symfony / Mercure 问题:具有单一自我目标的 2 个用户都可以看到彼此所谓的循环消息
- minizinc - 检查 MiniZinc 数组中的项目
- processing - 错误:看起来您在处理中混合了“活动”和“静态”模式
- php - 如何通过 Symfony 4 中的文件系统组件获取文件内容?
- view - 将 SwiftUI 视图捕获为电影?访问绘图组() MTKView?
- typescript - 如何使用 Array.prototype.find 为联合数组匹配正确的类型?