首页 > 解决方案 > Spark Dataframes 多个 JOIN

问题描述

我想通过比较将 2 列 LONG_IND 和 SHORT_IND 添加到 2 个数据帧之间的 JOIN

  1. 数据帧 1 中 ACCOUNT_NO 列的前 3 个字母前缀值介于数据帧 2 中的 Prefix_FROM 和 Prefix_TO 列之间(数字比较)
  2. 数据框 1 中 ACCOUNT_NO 列的最后 5 个字母后缀值介于数据框 2 中的 Suffix_FROM 和 Suffix_TO 列之间(数字比较)
  3. 数据帧 1 中 ACCOUNT_NO 列的前 3 个字母前缀值介于数据帧 2 中的 Prefix_FROM 和 Prefix_TO 列之间(字母数字比较)
  4. 数据框 1 中 ACCOUNT_NO 列的最后 5 个字母后缀值介于数据框 2 中的 Suffix_FROM 和 Suffix_TO 列之间(字母数字比较)
  5. 数据框 1 中 ACCOUNT_NO 列的最后 5 个字母后缀值为 ANYTHING(数据中的全部)(字母数字比较)
  6. 数据框 1 中的 ACCOUNT_NO 列可以是任何内容(数据中的所有内容)(默认情况)

    如何在同一个数据框中添加字母数字比较和默认情况?如果我再次编写单独的 JOIN,则 LONG_IND 和 SHORT_IND 列

数据框 1


ACCOUNT_NO,CostCenter,BU,MPU
0000001F,,BOXXBU          ,BOXXMP          
0000002Q,,BOXXBU          ,BOXXMP          
92115301,,BOXXBU          ,BOXXMP
32934789,,BOXXBU          ,BOXXMP
3FA34789,,BOXXBU          ,BOXXMP
3S534789,,BOXXBU          ,BOXXMP

数据框 2


ACCT PFX FROM,ACCT PFX TO,ACCT SFX FROM,ACCT SFX TO,TIER 1 LONG,TIER 2 LONG,TIER 1 SHORT,TIER 2 SHORT
329,329,89276,89276,15,10,65,10
3FA,3FA,00001,00001,1,1,90,1
ALL,ALL,ALL,ALL,8,99,88,99
934,999,ALL,ALL,8,85,88,85
3S4,3S6,ALL,ALL,6,22,65,22

现在我编写了下面的代码,适用于数字(上面的 a 和 b),如下所示,使用 2 个选项:

val getRuleDF = accDF.join(customerRulesDF,accDF("ACCOUNT_NO").substr(0, 3).between(customerRulesDF("ACCT_PFX_FROM"), customerRulesDF("ACCT_PFX_TO")) && accDF("ACCOUNT_NO").substr(4, 5).between(customerRulesDF("ACCT_SFX_FROM"), customerRulesDF("ACCT_SFX_TO")), "inner")
  .withColumn("LONG_IND", concatColumns(customerRulesDF("TIER_1_LONG"), customerRulesDF("TIER_2_LONG")) )
  .withColumn("SHORT_IND", concatColumns(customerRulesDF("TIER_1_SHORT"), customerRulesDF("TIER_2_SHORT")) )

或者

val getRule1DF = getRuleDF.join(customerRulesDF,
  (accDF("ACCOUNT_NO").substr(0, 3) >= customerRulesDF("ACCT_PFX_FROM")) &&
  (accDF("ACCOUNT_NO").substr(0, 3) <= customerRulesDF("ACCT_PFX_TO")) &&
  (accDF("ACCOUNT_NO").substr(4, 5) >= customerRulesDF("ACCT_SFX_FROM")) &&
  (accDF("ACCOUNT_NO").substr(4, 5) <=  customerRulesDF("ACCT_SFX_TO")), "inner")
  .withColumn("LONG_IND", concatColumns(customerRulesDF("TIER_1_LONG"), customerRulesDF("TIER_2_LONG")) )
  .withColumn("SHORT_IND", concatColumns(customerRulesDF("TIER_1_SHORT"), customerRulesDF("TIER_2_SHORT")) )

标签: scalaapache-sparkhadoop

解决方案


推荐阅读