首页 > 解决方案 > 如何处理代码存储库中的大文件?

问题描述

我有一个每天提供一个大的 .txt 文件 (50-75GB) 的数据馈送。该文件包含几个不同的模式,其中每一行对应一个模式。我想将其拆分为每个模式的分区数据集,我怎样才能有效地做到这一点?

标签: palantir-foundryfoundry-code-repositories

解决方案


您需要解决的最大问题是恢复模式的迭代速度,这对于这种规模的文件来说可能具有挑战性。

您最好的策略是获取一个示例“名义”文件,其中包含您要恢复的每个模式作为其中的一行,并将其作为文件添加到您的存储库中。当您将此文件添加到您的存储库(连同您的转换逻辑)中时,您将能够将其推送到数据框中,就像您使用数据集中的原始文件一样,以便快速测试迭代。

首先,确保将txt文件指定为包内容的一部分,这样您的测试就会发现它们(这在文档中有所介绍Read a file from a Python repository):

您可以将存储库中的其他文件读取到转换上下文中。这可能有助于为您的转换代码设置参数以供参考。

首先,在您的 python 存储库中编辑 setup.py:

setup(
   name=os.environ['PKG_NAME'],
# ...
    package_data={
        '': ['*.txt']
    }
)

我正在使用具有以下内容的 txt 文件:

my_column, my_other_column
some_string,some_other_string
some_thing,some_other_thing,some_final_thing

此文本文件位于我的存储库中的以下路径:transforms-python/src/myproject/datasets/raw.txt

将文本文件配置为随逻辑一起提供后,并将文件本身包含在存储库中后,您就可以包含以下代码。这段代码有几个重要的功能:

  1. 它将原始文件解析逻辑与将文件读入 Spark DataFrame 的阶段完全分开。这样一来,构建此 DataFrame 的方式可以留给测试基础设施或运行时,具体取决于您运行的位置。
  2. 这种保持逻辑分离的做法让您可以确保您想要做的实际逐行解析是它自己的可测试函数,而不是让它纯粹存在于您的my_compute_function
  3. 此代码使用 Spark-nativespark_session.read.text方法,这将比原始 txt 文件的逐行解析快几个数量级。这将确保并行化的 DataFrame 是您在执行程序(或更糟糕的是,您的驱动程序)内部逐行操作的对象,而不是单个文件。
from transforms.api import transform, Input, Output
from pkg_resources import resource_filename


def raw_parsing_logic(raw_df):
    return raw_df


@transform(
    my_output=Output("/txt_tests/parsed_files"),
    my_input=Input("/txt_tests/dataset_of_files"),
)
def my_compute_function(my_input, my_output, ctx):
    all_files_df = None
    for file_status in my_input.filesystem().ls('**/**'):
        raw_df = ctx.spark_session.read.text(my_input.filesystem().hadoop_path + "/" + file_status.path)
        parsed_df = raw_parsing_logic(raw_df)
        all_files_df = parsed_df if all_files_df is None else all_files_df.unionByName(parsed_df)
    my_output.write_dataframe(all_files_df)


def test_my_compute_function(spark_session):
    file_path = resource_filename(__name__, "raw.txt")
    raw_df = raw_parsing_logic(
      spark_session.read.text(file_path)
    )
    assert raw_df.count() > 0
    raw_columns_set = set(raw_df.columns)
    expected_columns_set = {"value"}
    assert len(raw_columns_set.intersection(expected_columns_set)) == 1

一旦您启动并运行此代码,您的test_my_compute_function方法将非常快速地迭代,以便您可以完善您的模式恢复逻辑。这将使在最后构建数据集变得更加容易,但不会产生完整构建的任何开销。


推荐阅读