loops - pySpark 迭代重复变量
问题描述
我有一个当前有效的代码,但是我希望使其更高效并避免硬编码:
1) 避免硬编码:当 Id = 4 时,forNotDefined_filterDomainLookup
将希望为相应的 Code 和 Name 引用default_reference df。而不是硬编码 Code 和 Name 值。
问题 1
列名列表和相应的新列名
test_matchedAttributeName_List =dict(matchedDomains.agg(collect_set(array('DomainName', 'TargetAttributeForName')).alias('m')).first().m)
Output: {'LeaseType': 'ConformedLeaseTypeName', 'LeaseRecoveryType': 'ConformedLeaseRecoveryTypeName', 'LeaseStatus': 'ConformedLeaseStatusName'}
工作代码,除了避免硬编码。具体来说,我想在Id = 4时为对应的Code和Name引用default_reference df
cond = col('PrimaryLookupAttributeName').isNull() & col('SecondaryLookupAttributeName').isNull()
NotDefined_filterDomainLookup = filterDomainLookup \
.withColumn('OutputItemIdByAttribute', when(cond, lit('4')).otherwise(col('OutputItemIdByAttribute'))) \
.withColumn('OutputItemCodeByAttribute', when(cond, lit('N/D')).otherwise(col('OutputItemCodeByAttribute'))) \
.withColumn('OutputItemNameByAttribute', when(cond, lit('Not Defined')).otherwise(col('OutputItemNameByAttribute')))
------------+------------+------------- ------------+----------------
解决方案
对于问题 2,根据您的代码,我建议进行如下调整:
- 设置item_keys包括Id、Name和Code并使用列表推导合并相同的逻辑
- 使用struct代替array来实现上述逻辑
- 无需为NotDefned_Attribute_List创建 Python 字典,元组列表就足够了,更好
请参见以下步骤:
(1) 设置两个聚合函数来计算item_map用于testing_mappings和NotDefined_Attribute_List。检查named_struct和struct(练习相同任务的两种方法)
from itertools import chain
from pyspark.sql.functions import expr, collect_set, struct, col
item_keys = ['Id', 'Name', 'Code']
# use SQL expression
m1_by_sql_expr = expr("""
collect_set(
named_struct(
'attr_name', PrimaryLookupAttributeName,
'attr_value', PrimaryLookupAttributeValue,
'Id', OutputItemIdByValue,
'Name', OutputItemNameByValue,
'Code', OutputItemCodeByValue
)
) as item_map
""")
# use PySpark API functions
m2_by_func = collect_set(
struct(
col('DomainName').alias('domain'),
col('TargetAttributeForId').alias('Id'),
col('TargetAttributeForName').alias('Name'),
col('TargetAttributeForCode').alias('Code')
)
).alias('item_map')
(2)设置ItemKey(Id、Code或Name)+ PrimaryLookupAttributeName + PrimaryLookupAttributeValue映射到ItemValue
m1 = NotDefined_filterDomainLookup.agg(m1_by_sql_expr).first().item_map
"""create a list of tuples of (map_key, map_value) to create MapType column:
| map_key = concat_ws('\0', item_key, attr_name, attr_value)
| map_value = item_value
"""
testingId = [('\0'.join([k, row.attr_name, row.attr_value]), row[k]) for row in m1 for k in item_keys if row[k]]
#[('Id\x00LeaseRecoveryType\x00Gross w/base year', '18'),
# ('Name\x00LeaseRecoveryType\x00Gross w/base year', 'Modified Gross'),
# ('Id\x00LeaseStatus\x00Abandoned', '10'),
# ('Name\x00LeaseStatus\x00Abandoned', 'Active'),
# ('Id\x00LeaseStatus\x00Draft', '10'),
# ('Name\x00LeaseStatus\x00Draft', 'Pending'),
# ('Id\x00LeaseStatus\x00Archive', '11'),
# ('Name\x00LeaseStatus\x00Archive', 'Expired'),
# ('Id\x00LeaseStatus\x00Terminated', '10'),
# ('Name\x00LeaseStatus\x00Terminated', 'Terminated'),
# ('Id\x00LeaseRecoveryType\x00Gross', '11'),
# ('Name\x00LeaseRecoveryType\x00Gross', 'Gross'),
# ('Id\x00LeaseRecoveryType\x00Gross-modified', '15'),
# ('Name\x00LeaseRecoveryType\x00Gross-modified', 'Modified Gross')]
# this could be a problem for too many entries.
testing_mappings = create_map([lit(i) for i in chain.from_iterable(testingId)])
(3)创建NotDefined_AttributeCode_List(逻辑同(2),m2使用PySpark API函数)
m2 = matchedDomains.agg(m2_by_func).first().item_map
NotDefned_Attribute_List = [(k, row.domain, row[k]) for row in m2 for k in item_keys if row[k]]
(4) 根据NotDefined_Attribute_List获取附加列的列表:
additional_cols = [
testing_mappings[concat_ws('\0', lit(k), lit(c), col(c))].alias(c_name)
for k,c,c_name in NotDefined_Attribute_List
]
(5) 选择附加列
if count_ND > 0:
# move code above in (2), (3) and (4) here
# set up testing_NotDefined
testing_NotDefined = datasetMatchedPortfolio.select("*", *additional_cols)
else:
print("no Not Defines exist")
推荐阅读
- android - android上的开发人员选项不断重新启用自身
- javascript - JS - 从 URL 下载文件并显示百分比
- docker - 如何使用 shell 脚本将不安全的注册表添加到 CentOS 上的 Docker
- powershell - 当路径包含特殊字符时,包含脚本不起作用
- javascript - 会话 ID 到期后不会更改
- laravel - 如何重构两个实现相同方法的模型类
- scala - scala 将元组解包到案例类参数和附加的 zip 两个序列中
- ios - 在后台运行的 Apple HealthKit 查询与启用后台交付有什么区别?
- python - 从函数中的函数绑定和返回值(Tkinter)
- google-sheets - google sheet SUM a range by address of the range