首页 > 解决方案 > 在 pyspark 数据框中计算值时出错

问题描述

我运行了一个代码来计算客户基于buysell存在于transaction_code column.

IfBuy意味着我从 credit_card_limit 中减去金额。Ifsell 意味着我将金额添加到 credit_card_limit。

我还检查了代码中是否存在空值并将其删除。

下面是初始表

+----------+------+-----------------+----------+------+----------------+
|account_id|amount|credit_card_limit|  date_map|amount|transaction_code|
+----------+------+-----------------+----------+------+----------------+
|     12345|  1000|           100000|      null|  1000|             buy|
|     12345|  1100|           100000|2020-06-02|  1100|             buy|
|     12345|   500|           100000|2020-06-02|   500|            sell|
|     12345|   200|           100000|2020-06-03|   200|             buy|
|     12345|  4000|           100000|2020-06-04|  4000|             buy|
+----------+------+-----------------+----------+------+----------------+

我验证了该列并检查其中是否存在任何 NULL 值并将其删除。在这里您可以看到上一个表中的第一行已被删除,因为它的 date_map 具有空值

+----------+------+-----------------+----------+------+----------------+
|account_id|amount|credit_card_limit|  date_map|amount|transaction_code|
+----------+------+-----------------+----------+------+----------------+
|     12345|  1100|           100000|2020-06-02|  1100|             buy|
|     12345|   500|           100000|2020-06-02|   500|            sell|
|     12345|   200|           100000|2020-06-03|   200|             buy|
|     12345|  4000|           100000|2020-06-04|  4000|             buy|
+----------+------+-----------------+----------+------+----------------+

但是当我尝试计算费用列时,也考虑了删除行的金额,并计算了错误的金额。我不知道它是怎么发生的,因为我已经删除了该行。

下面是结果表,在这里您可以看到空行数量(1000)也从 10000 中减去,因此我们得到 98400 而不是 99400。

+----------+-----------------+----------+--------+-----------+
|account_id|credit_card_limit|  date_map|expenses|credit_left|
+----------+-----------------+----------+--------+-----------+
|     12345|           100000|2020-06-02|    -600|      98400|
|     12345|           100000|2020-06-03|    -200|      97100|
|     12345|           100000|2020-06-04|   -4000|      93100|
|     12345|           100000|2020-06-05|    -900|      92200|
+----------+-----------------+----------+--------+-----------+

我的表的初始架构:

root
 |-- account_id: long (nullable = true)
 |-- credit_card_limit: long (nullable = true)
 |-- credit_card_number: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- phone_number: long (nullable = true)
 |-- amount: string (nullable = true)
 |-- date: string (nullable = true)
 |-- shop: string (nullable = true)
 |-- transaction_code: string (nullable = true)

下面是我使用的代码:

spark=SparkSession.builder.master("local").appName("PPython").getOrCreate()

df=spark.read.json("C:\\Users\\Admin\\Desktop\\sample.json",multiLine=True)

#Exploding the transaction column
df=df.selectExpr("*","explode(transactions)").select("*","col.*").drop(*['col','transactions'])

df.printSchema()
#Changing the datatype of columns to our required dataset
df=df.withColumn('account_id',col('account_id').cast(LongType()))
df=df.withColumn('amount',col('amount').cast(LongType()))
df=df.withColumn('credit_card_limit',col('credit_card_limit').cast(LongType()))
df=df.withColumn('credit_card_number',col('credit_card_number').cast(LongType()))
df=df.withColumn('phone_number',col('phone_number').cast(LongType()))
df=df.withColumn('shop',col('shop').cast(StringType()))
df=df.withColumn('first_name',col('first_name').cast(StringType()))
df=df.withColumn('last_name',col('last_name').cast(StringType()))
df=df.withColumn('transaction_code',col('transaction_code').cast(StringType()))
df=df.withColumn('date',col('date').cast(StringType()))

df=df.filter(df.date.isNotNull())
#Cleaning and Validating based on date column
df=df.withColumn('full_name',f.concat(f.col('first_name'),f.lit(' '),f.col('last_name'))).drop('first_name','last_name')
df=df.withColumn("cleaned_map", regexp_replace("date", "[^0-9T]", "")).withColumn("date_map", to_date("cleaned_map", "ddMMyyyy")).drop('cleaned_map','date')
df.select('account_id','amount','credit_card_limit','date_map','amount','transaction_code').show(5)

finaldf=df.filter(df.date_map.isNotNull())

#Cleaning the data which has null values
finaldf=finaldf.filter(df.account_id.isNotNull())
finaldf=finaldf.filter(df.credit_card_number.isNotNull())
finaldf=finaldf.filter(df.transaction_code.isNotNull())

finaldf=finaldf.filter(df.amount.isNotNull())


finaldf =finaldf.fillna(0, subset=['amount'])


#Validate the columns if they have any additional characters,punctuations
finaldf=finaldf.withColumn('phone_number',regexp_replace("phone_number","[^0-9]",""))
finaldf=finaldf.withColumn('account_id',regexp_replace("account_id","[^0-9]",""))
finaldf=finaldf.withColumn('credit_card_limit',regexp_replace("credit_card_limit","[^0-9]",""))
finaldf=finaldf.withColumn('credit_card_number',regexp_replace("credit_card_number","[^0-9]",""))
finaldf=finaldf.withColumn('amount',regexp_replace("amount","[^0-9 ]",""))
finaldf=finaldf.withColumn('full_name',regexp_replace("full_name","[^a-zA-Z ]",""))
finaldf=finaldf.withColumn('transaction_code',regexp_replace("transaction_code","[^a-zA-Z]",""))
finaldf=finaldf.withColumn('shop',regexp_replace("shop","[^a-zA-Z ]",""))

finaldf=finaldf.withColumn('account_id',col('account_id').cast(LongType()))
finaldf=finaldf.withColumn('amount',col('amount').cast(LongType()))
finaldf=finaldf.withColumn('credit_card_limit',col('credit_card_limit').cast(LongType()))
finaldf=finaldf.withColumn('credit_card_number',col('credit_card_number').cast(LongType()))
finaldf=finaldf.withColumn('phone_number',col('phone_number').cast(LongType()))

#Making the first letter of the Names as Capital letters and also trimming the extra spaces present in the names
finaldf=finaldf.withColumn("full_name",initcap(col("full_name"))).withColumn("full_name",regexp_replace(col("full_name"),'\s+',' '))

#Masking the credit card number
finaldf=finaldf.withColumn("credit_card_number", expr("concat(translate(left(credit_card_number, 8), '0123456789', '**********'), " +
      "right(credit_card_number, 4))"))

#Grouping the columns based on conditions
w = Window.partitionBy(f.lit(0)).orderBy('date_map')
finaldf.select('account_id','amount','credit_card_limit','date_map','amount','transaction_code').show(4)
finaldf=finaldf.groupBy('account_id','full_name','credit_card_limit','credit_card_number','date_map','phone_number').agg(f.sum(f.when(f.col('transaction_code')=='buy',-f.col('amount')).\
              otherwise(f.col('amount'))).alias('expenses')).\
    select('*',(f.col('credit_card_limit')+f.sum(f.col('expenses')).over(w)).alias('credit_left'))

#Arranging the table based on account_id,date
finaldf=finaldf.orderBy('account_id','date_map')
finaldf.drop('phone_number','full_name','credit_card_number').show(5)

所需输出:

+----------+-----------------+----------+--------+-----------+
|account_id|credit_card_limit|  date_map|expenses|credit_left|
+----------+-----------------+----------+--------+-----------+
|     12345|           100000|2020-06-02|    -600|      99400|
|     12345|           100000|2020-06-03|    -200|      99200|
|     12345|           100000|2020-06-04|   -4000|      95200|
+----------+-----------------+----------+--------+-----------+

如果我不删除任何行,则此代码可以正常工作,但在我删除行时无法正常工作。请帮助我找到此代码中的错误,非常感谢!

标签: apache-sparkpysparkapache-spark-sqlpyspark-dataframes

解决方案


推荐阅读