首页 > 解决方案 > Python:子进程 Popen 立即退出大文件

问题描述

要求:执行脚本编辑文件

问题:对于大文件,它不会等待整个过程完成并立即打印“转换脚本成功”,而文件没有更新。如果它是小文件,它会执行得很好

我如何确保它等到整个过程完成?

零件代号

with NamedTemporaryFile("wb") as f_source, NamedTemporaryFile("wb") as f_dest:
            source_s3_key_object.download_fileobj(Fileobj=f_source)
            f_source.flush()

            if self.transform_script is not None:
                process = subprocess.Popen(
                    [self.transform_script, f_source.name, f_dest.name],
                    stdout=subprocess.PIPE,
                    stderr=subprocess.STDOUT,
                    close_fds=True
                )

                self.log.info("Output:")
                for line in iter(process.stdout.readline, b''):
                    self.log.info(line.decode(self.output_encoding).rstrip())

                process.wait()

                if process.returncode > 0:
                    raise AirflowException(
                        "Transform script failed: {0}".format(process.returncode)
                    )
                else:
                    self.log.info(
                        "Transform script successful. Output temporarily located at %s",
                        f_dest.name
                    )

Python 脚本

小文件的日志:

[2020-07-07 20:35:37,892] {s3_file_transform_operator.py:115} INFO - Downloading source S3 file s3://path/incoming.2020-07-07.txt
[2020-07-07 20:35:41,981] {s3_file_transform_operator.py:124} INFO - Dumping S3 file s3://path/incoming.2020-07-07.txt contents to local file /tmp/tmp3v_6i1go
[2020-07-07 20:35:42,115] {s3_file_transform_operator.py:145} INFO - Output:
[2020-07-07 20:35:42,293] {s3_file_transform_operator.py:147} INFO - Starting data cleaning...
[2020-07-07 20:35:42,293] {s3_file_transform_operator.py:147} INFO - Completed data cleaning!
[2020-07-07 20:35:42,298] {s3_file_transform_operator.py:158} INFO - Transform script successful. Output temporarily located at /tmp/tmp8uo9t2lk
[2020-07-07 20:35:42,298] {s3_file_transform_operator.py:161} INFO - Uploading transformed file to S3
[2020-07-07 20:35:43,983] {s3_file_transform_operator.py:168} INFO - Upload successful

对于大文件日志:

[2020-07-07 20:25:37,892] {s3_file_transform_operator.py:115} INFO - Downloading source S3 file s3://path/incoming.2020-07-07.txt
[2020-07-07 20:25:52,027] {s3_file_transform_operator.py:124} INFO - Dumping S3 file s3://path/incoming.2020-07-07.txt contents to local file /tmp/tmpgayy9hg9
[2020-07-07 20:26:26,256] {s3_file_transform_operator.py:145} INFO - Output:
[2020-07-07 20:26:29,137] {s3_file_transform_operator.py:158} INFO - Transform script successful. Output temporarily located at /tmp/tmpui1i28r6
[2020-07-07 20:26:29,137] {s3_file_transform_operator.py:161} INFO - Uploading transformed file to S3

转换脚本.py:

#!/usr/bin/env python3
import re
from contextlib import closing
import mmap
import sys
import logging
import os
logger = logging.getLogger(__name__)
"""
Read the last line of the file and remove the double quotes and extra delimiters
and write back to the file.
"""
input = sys.argv[1]
output = sys.argv[2]

def clnup(input, output):
    try:
        with open(input, 'r+b') as myfile:
            with closing(mmap.mmap(myfile.fileno(), 0, access=mmap.ACCESS_WRITE)) as mm:
                start_of_line = mm.rfind(b'\n', 0, len(mm) - 1) + 1
                line = mm[start_of_line:].rstrip(b'\r\n')
                last_line = line.decode('utf-8').replace("\"", "")
                last_line = re.sub('[|]*$', '', last_line).encode('utf-8')
            myfile.seek(start_of_line)  # Move to where old line began
            myfile.write(last_line)  # Overwrite existing line with new line
            myfile.truncate()
        with open(input, 'r+b') as myfile:
            f = open("temp.txt", "w+b")
            f.write(myfile.read())
        with open("temp.txt", 'r+b') as myfile:
            f = open(output, "w+b")
            f.write(myfile.read())
        os.remove("temp.txt")
    except Exception as ex:
            logger.error(f'Error in updating the file. Message: {ex}')
            raise

print("Starting data cleaning...")
clnup(input, output)
print("Completed data cleaning!")

标签: pythonsubprocesspopen

解决方案


推荐阅读