python - Python:子进程 Popen 立即退出大文件
问题描述
要求:执行脚本编辑文件
问题:对于大文件,它不会等待整个过程完成并立即打印“转换脚本成功”,而文件没有更新。如果它是小文件,它会执行得很好
我如何确保它等到整个过程完成?
零件代号
with NamedTemporaryFile("wb") as f_source, NamedTemporaryFile("wb") as f_dest:
source_s3_key_object.download_fileobj(Fileobj=f_source)
f_source.flush()
if self.transform_script is not None:
process = subprocess.Popen(
[self.transform_script, f_source.name, f_dest.name],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
close_fds=True
)
self.log.info("Output:")
for line in iter(process.stdout.readline, b''):
self.log.info(line.decode(self.output_encoding).rstrip())
process.wait()
if process.returncode > 0:
raise AirflowException(
"Transform script failed: {0}".format(process.returncode)
)
else:
self.log.info(
"Transform script successful. Output temporarily located at %s",
f_dest.name
)
Python 脚本
小文件的日志:
[2020-07-07 20:35:37,892] {s3_file_transform_operator.py:115} INFO - Downloading source S3 file s3://path/incoming.2020-07-07.txt
[2020-07-07 20:35:41,981] {s3_file_transform_operator.py:124} INFO - Dumping S3 file s3://path/incoming.2020-07-07.txt contents to local file /tmp/tmp3v_6i1go
[2020-07-07 20:35:42,115] {s3_file_transform_operator.py:145} INFO - Output:
[2020-07-07 20:35:42,293] {s3_file_transform_operator.py:147} INFO - Starting data cleaning...
[2020-07-07 20:35:42,293] {s3_file_transform_operator.py:147} INFO - Completed data cleaning!
[2020-07-07 20:35:42,298] {s3_file_transform_operator.py:158} INFO - Transform script successful. Output temporarily located at /tmp/tmp8uo9t2lk
[2020-07-07 20:35:42,298] {s3_file_transform_operator.py:161} INFO - Uploading transformed file to S3
[2020-07-07 20:35:43,983] {s3_file_transform_operator.py:168} INFO - Upload successful
对于大文件日志:
[2020-07-07 20:25:37,892] {s3_file_transform_operator.py:115} INFO - Downloading source S3 file s3://path/incoming.2020-07-07.txt
[2020-07-07 20:25:52,027] {s3_file_transform_operator.py:124} INFO - Dumping S3 file s3://path/incoming.2020-07-07.txt contents to local file /tmp/tmpgayy9hg9
[2020-07-07 20:26:26,256] {s3_file_transform_operator.py:145} INFO - Output:
[2020-07-07 20:26:29,137] {s3_file_transform_operator.py:158} INFO - Transform script successful. Output temporarily located at /tmp/tmpui1i28r6
[2020-07-07 20:26:29,137] {s3_file_transform_operator.py:161} INFO - Uploading transformed file to S3
转换脚本.py:
#!/usr/bin/env python3
import re
from contextlib import closing
import mmap
import sys
import logging
import os
logger = logging.getLogger(__name__)
"""
Read the last line of the file and remove the double quotes and extra delimiters
and write back to the file.
"""
input = sys.argv[1]
output = sys.argv[2]
def clnup(input, output):
try:
with open(input, 'r+b') as myfile:
with closing(mmap.mmap(myfile.fileno(), 0, access=mmap.ACCESS_WRITE)) as mm:
start_of_line = mm.rfind(b'\n', 0, len(mm) - 1) + 1
line = mm[start_of_line:].rstrip(b'\r\n')
last_line = line.decode('utf-8').replace("\"", "")
last_line = re.sub('[|]*$', '', last_line).encode('utf-8')
myfile.seek(start_of_line) # Move to where old line began
myfile.write(last_line) # Overwrite existing line with new line
myfile.truncate()
with open(input, 'r+b') as myfile:
f = open("temp.txt", "w+b")
f.write(myfile.read())
with open("temp.txt", 'r+b') as myfile:
f = open(output, "w+b")
f.write(myfile.read())
os.remove("temp.txt")
except Exception as ex:
logger.error(f'Error in updating the file. Message: {ex}')
raise
print("Starting data cleaning...")
clnup(input, output)
print("Completed data cleaning!")
解决方案
推荐阅读
- reactjs - 无法在 Redux Reducer 中过滤数据
- javascript - 生成 .prettierrc 文件
- java - 我应该如何将函数映射到java中的字符串数组?
- python - Python Selenium 从 div 复制文本,3
- javascript - 如何使用依赖于两个按钮启动和停止的 React Hooks 制作计数器?
- python - discord.py 中的提醒命令问题
- sql - 根据 SQL 查询 (SQL Server) 中的其他列创建自定义列
- c# - Jenkins 从 nuget.org 下载 nuget 包,即使我已经从 NuGet.config 中删除了提要
- javascript - 如何在javasript中为数组对象的特定对象设置值
- python - 用于重启 ec2 实例的 aws lambda python 函数不起作用