amazon-web-services - 覆盖 aws 胶水工作室中的数据
问题描述
我正在 AWS Glue Studio 中构建一个 ETL 流程,我在其中获取存储桶 s3 中的数据以删除一些字段。在此过程之后,我需要使用自定义转换来掩盖一些数据,然后将其保存在新的 s3 存储桶中。
当我使用自定义转换时,会显示一个块代码。所以我需要在那里创建我的代码。
我的问题是关于它,我如何使用它来构建我的转换,更具体地说是关于输入和输出数据,因为我不了解 AWS 上的这个过程。
我现在所做的是创建将操纵这些数据的函数。我使用 dfc['my_field'] 获取我的字段并应用了转换,但我不明白我需要返回什么以及如何将其保存在 s3 存储桶中。下面你可以看到我的功能。
def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
import random
import hashlib
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, lit
df = dfc.select(list(dfc.keys())[0]).toDF()
print('printing key0')
df.show(3)
print('printed key0')
print('imprimindo o tipo de df')
print(type(df))
def encrypt_phone(df):
'''
Criptografa dados de telefone
'''
if df.withColumn('phone_main', df('phone_main')) is not None:
hl = hashlib.sha256()
hl.update(df.withColumn('phone_main', df('phone_main'))).encode().hexdigest()
hl.update(df.withColumn('phone_alternative', df('phone_alternative'))).encode().hexdigest()
return df
# Criar um UDF para encriptar os dados
from pyspark.sql.functions import udf
encrypted_udf = udf(encrypt_phone)
print(encrypted_udf)
# Renomear coluna phone_main e phone_alternative
df.withColumnRenamed('phone_main', 'old_phone_main') \
.withColumnRenamed('phone_alternative', 'old_phone_alternative')
# Criar novas colunas para dados transformados
new_df = encrypt_phone(df)
encrypted = df.withColumn('phone_main', new_df('phone_main'))
encrypted = df.withColumn('phone_alternative',new_df('phone_alternative'))
print(encrypted)
# Remover campos sensíveis
encripted_df = df.drop('cpf_cnpj', 'name', 'nickname', 'old_phone_main', 'old_phone_alternative')
print(df_encrypted)
# Mascarar email
print(encripted_df.withColumn('email', df('email')))
if encripted_df.withColumn('email', df('email')) is not None:
masked_df = encripted_df.withColumn('email', lit('****@*****'))
# Converter spark dataframe para Glue Dynamic Frame e retornar como um dataframe collection
df_overshadow = DynamicFrame.fromDF(masked_df, glueContext, "df_overshadow")
return (DynamicFrameCollection({"CustomTransform0": df_overshadow}, glueContext))
我在胶水工作室上的流程如下图所示:
但是,当我尝试保存时显示错误:
Parent node overshadow outputs a collection, but node Process-Client_PFS does not accept a collection.
我需要应用转换并保存数据。我该怎么做?
解决方案
dfc
是一个DynamicFrameCollection
。您需要DynamicFrame
从集合中选择您的才能使用。从文档中:
您必须使用 SelectFromCollection 转换从自定义转换节点的结果中选择单个 DynamicFrame,然后才能将输出发送到目标位置。
这应该会返回您的第一个 DynamicFrame:
dfc.select(list(dfc.keys())[0])
推荐阅读
- javascript - blurDataURL 的 Nextjs 数据 URL
- time-complexity - GridSearchCV 和时间复杂度
- javascript - 向工具提示饼图 echarts4r 添加额外变量
- scala - 在 Scala 中将 Spark.sql 时间戳转换为 java.time.Instant
- websphere-liberty - 我的欢迎文件在 Open Liberty 中返回一个空白页面
- firebase - Flutter:'firebase' 未被识别为内部或外部命令、可运行程序或批处理文件
- greenplum - 传播镜像的 gp_vmem_protect_limit 配置
- pandas - Python Pandas:同一行的不同列中出现相同值的次数
- apache-spark - Stream Stream 加入 Spark 结构 Streaming
- c++ - 在函数中显示地址