首页 > 解决方案 > 覆盖 aws 胶水工作室中的数据

问题描述

我正在 AWS Glue Studio 中构建一个 ETL 流程,我在其中获取存储桶 s3 中的数据以删除一些字段。在此过程之后,我需要使用自定义转换来掩盖一些数据,然后将其保存在新的 s3 存储桶中。

当我使用自定义转换时,会显示一个块代码。所以我需要在那里创建我的代码。

我的问题是关于它,我如何使用它来构建我的转换,更具体地说是关于输入和输出数据,因为我不了解 AWS 上的这个过程。

我现在所做的是创建将操纵这些数据的函数。我使用 dfc['my_field'] 获取我的字段并应用了转换,但我不明白我需要返回什么以及如何将其保存在 s3 存储桶中。下面你可以看到我的功能。

def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
import random
import hashlib
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, lit

df = dfc.select(list(dfc.keys())[0]).toDF()
print('printing key0')
df.show(3)
print('printed key0')
print('imprimindo o tipo de df')
print(type(df))

def encrypt_phone(df):
    '''
     Criptografa dados de telefone
    '''
    if df.withColumn('phone_main', df('phone_main')) is not None:
        hl = hashlib.sha256()
        hl.update(df.withColumn('phone_main', df('phone_main'))).encode().hexdigest()
        hl.update(df.withColumn('phone_alternative', df('phone_alternative'))).encode().hexdigest()
    return df

# Criar um UDF para encriptar os dados
from pyspark.sql.functions import udf
encrypted_udf = udf(encrypt_phone)
print(encrypted_udf)

# Renomear coluna phone_main e phone_alternative
df.withColumnRenamed('phone_main', 'old_phone_main') \
                    .withColumnRenamed('phone_alternative', 'old_phone_alternative')

# Criar novas colunas para dados transformados
new_df = encrypt_phone(df)
encrypted = df.withColumn('phone_main', new_df('phone_main'))
encrypted = df.withColumn('phone_alternative',new_df('phone_alternative'))
print(encrypted)

# Remover campos sensíveis
encripted_df = df.drop('cpf_cnpj', 'name', 'nickname', 'old_phone_main', 'old_phone_alternative')
print(df_encrypted)

# Mascarar email
print(encripted_df.withColumn('email', df('email')))
if encripted_df.withColumn('email', df('email')) is not None:
    masked_df = encripted_df.withColumn('email', lit('****@*****'))
    
# Converter spark dataframe para Glue Dynamic Frame e retornar como um dataframe collection
df_overshadow = DynamicFrame.fromDF(masked_df, glueContext, "df_overshadow")

return (DynamicFrameCollection({"CustomTransform0": df_overshadow}, glueContext))

我在胶水工作室上的流程如下图所示:

在此处输入图像描述

但是,当我尝试保存时显示错误:

Parent node overshadow outputs a collection, but node Process-Client_PFS does not accept a collection.

我需要应用转换并保存数据。我该怎么做?

标签: amazon-web-servicesaws-glue

解决方案


dfc是一个DynamicFrameCollection。您需要DynamicFrame从集合中选择您的才能使用。从文档中

您必须使用 SelectFromCollection 转换从自定义转换节点的结果中选择单个 DynamicFrame,然后才能将输出发送到目标位置。

这应该会返回您的第一个 DynamicFrame:

dfc.select(list(dfc.keys())[0])

推荐阅读