apache-spark - 定期删除 EMR 集群日志
问题描述
我有一个 EMR 集群,它成功运行了几天的 spark 流作业。但几天后,集群因步骤失败而终止。我检查了日志,它说
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00007f8cb0854000, 12288, 0) failed; error='Cannot allocate memory' (errno=12)
Command exiting with ret '1'
对于这个错误,我检查并发现,对于 JRE,内存是不够的。
我发现集群创建 EMR 步骤日志并存储在路径/mnt/var/logs/hadoop/steps/step_id/ 上,并且在创建集群时我给出了一个logUri路径,因此日志被复制到 s3 位置。所以我的猜测是,由于这些日志,步骤失败正在发生。
谁能建议我如何定期从集群中删除这些 emr 步骤日志,以免集群内存不足?
解决方案
您可以使用以下 boto3 代码(我相信这也可以使用适用于 Java 的 AWS SDK 在 Java 中完成)来删除日志,为了定期删除,您有如下选项
- 使用 Airflow 之类的工作流调度程序,请参见下面的示例
- 将它用作 lambda 函数并安排它定期运行(更容易)
- 在本地使用 cron jon(不太可行)
删除日志的功能(输入过期的threshold
,bucket name
和prefix
,可以是"logs/sparksteps/j-"
)
def clean_s3(buck, match_prefix,exp_threshold):
s3_client = boto3.client('s3')
key_names = []
file_timestamp = []
file_size = []
kwargs = {"Bucket": buck, "Prefix": match_prefix}
while True:
result = s3_client.list_objects_v2(**kwargs)
for obj in result["Contents"]:
if "." in obj["Key"]:
key_names.append(obj["Key"])
file_timestamp.append(obj["LastModified"].timestamp())
file_size.append(obj["Size"])
try:
kwargs["ContinuationToken"] = result["NextContinuationToken"]
except KeyError:
break
key_info = {
"key_path": key_names,
"timestamp": file_timestamp,
"size": file_size
}
#print(f'All Keys in {buck} with {prefix} Prefix found!')
s3_file = key_info
for i, fs in enumerate(s3_file["timestamp"]):
#file_expired = is_expired(fs)
#print(fs)
if fs < exp_threshold: #if True is recieved
print("Deleting %s" % {s3_file["key_path"][i]})
s3_client.delete_object(Bucket=buck, Key=s3_file["key_path"][i])
您可以计算您需要通过的到期阈值(以纪元秒为单位),如下所示
date_now = time.time()
days = 7 # 7 days
total_time = 86400*days
exp_threshold = date_now-total_time
现在,对于选项 1,您可以制作如下所示的气流运算符
s3_cleanup = PythonOperator(
task_id='s3cleanup',
python_callable=clean_s3,
op_kwargs={
'buck': '<you bucket>',
'match_prefix': "logs/sparksteps/j-",
'exp_threshold':exp_threshold,
},dag=dag)
或者,使用 apporach 2,您可以使用 AWS lamda 进行调度,请参阅此处的 lambda 调度指南
推荐阅读
- javascript - React Material UI 菜单未关闭
- python - 如何检查用户是否在服务器中,而只有机器人的行会 [python discord bot]
- oracle - Oracle 游标,未找到数据异常
- php - 如何在 FancyBox 上使所有图像的纵横比为 1:1?
- angular - 勾选复选框时如何同时执行多个功能
- java - 为什么 Springboot 中的 2 个 Kafka 消费者指的是同一个 Kafka Broker?
- function - 新项目弄乱了我的变量 PowerShell
- javascript - 如何以角度测试组件@Input?
- c - 需要帮助理解来自 ghidra 的反编译代码行
- real-time - 无需服务器的实时文档协作