首页 > 解决方案 > 使用 pyspark 数据框验证多个电子邮件地址列并将两列值连接到由管道分隔的 1 列值中

问题描述

我在 pyspark 数据框中有两个电子邮件地址,需要验证两个电子邮件地址列值,然后使用管道作为分隔符合并为 1 列值。

Primary_Email_Address       Alternate_Email_Address      Output(email_address)
navg@gmail.com                nvgng@gm@.com              navg@gmail.com
ggg@gmail.com                 nnnn@gmail.com             ggg@gmail.com|nnnn@gmail.com
nave@gmail.com|gg@gmail.com   ndg@gmail.com          nave@gmail.com|gg@gmail.com|ndg@gmail.com          

我需要从 spark 数据帧 csv 文件动态传递列并处理它。

我已经写了下面的代码,但需要一些修改。

from pyspark.sql.session import SparkSession
from pyspark.sql.functions import concat, lit, col, trim, when
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()
regex = '^\w+([\.-]?\w+)*@\w+([\.-]?\w+)*(\.\w{2,3})+$'

df = spark.read.option("header",True).csv("file_name.csv")

# select specific columns

df1 = df.select("primary_email_addr","alt_email_addr")

# concat the columns

df_email = df1.withColumn('email_addr',concat(df1['primary_email_addr'],lit("|"),df1['alt_email_addr'])))

df_email_1 = df_email.drop("primary_email_addr","alt_email_addr")

df_email_2 = df_email_1.withColumn("email_addr_status",when(F.col("email_addr").rlike("^(([a-zA-Z0-9_.+-]+)+@.+\\.[A-Za-z]+)(| ([a-zA-Z0-9_.+-]+)+@.+\\.[A-Za-z]+)*$"),"Valid").otherwise("Invalid"))

df_email_2.show(truncate=False)

标签: pythonapache-sparkpysparkapache-spark-sql

解决方案


这是另一种使用regexp_extract_allarray_join与 Spark >= 3.1.0 的方法:

from pyspark.sql.functions import expr

cols = ['Primary_Email_Address', 'Alternate_Email_Address']
data = [("navg@gmail.com", "nvgng@gm@.com"),
        ("ggg@gmail.com", "nnnn@gmail.com")]

df = spark.createDataFrame(data, cols)

concat_expr = expr("concat_ws(',', Primary_Email_Address, Alternate_Email_Address)") 
extract_expr = expr("regexp_extract_all(concat, '(\\\w+([\\\.-]?\\\w+)*@\\\w+([\\\.-]?\\\w+)*(\\\.\\\w{2,3})+)', 0)")
join_expr = expr("array_join(extract, '|')")

df.withColumn("concat", concat_expr) \
  .withColumn("extract", extract_expr) \
  .withColumn("output", join_expr) \
  .select("Primary_Email_Address", "Alternate_Email_Address", "output") 

# +---------------------+-----------------------+----------------------------+
# |Primary_Email_Address|Alternate_Email_Address|output                      |
# +---------------------+-----------------------+----------------------------+
# |navg@gmail.com       |nvgng@gm@.com          |navg@gmail.com              |
# |ggg@gmail.com        |nnnn@gmail.com         |ggg@gmail.com|nnnn@gmail.com|
# +---------------------+-----------------------+----------------------------+

逻辑:

  1. 用于concat_ws连接电子邮件。
  2. 调用regexp_extract_all以提取有效电子邮件的数组。
  3. 调用array_join以从前一个数组中生成 s 字符串,其中项目由 . 分隔|

推荐阅读