apache-spark - 在 pyspark 数据框中计算值时出错
问题描述
我运行了一个代码来计算客户基于buy
或sell
存在于transaction_code column
.
IfBuy
意味着我从 credit_card_limit 中减去金额。Ifsell
意味着我将金额添加到 credit_card_limit。
我还检查了代码中是否存在空值并将其删除。
下面是初始表
+----------+------+-----------------+----------+------+----------------+
|account_id|amount|credit_card_limit| date_map|amount|transaction_code|
+----------+------+-----------------+----------+------+----------------+
| 12345| 1000| 100000| null| 1000| buy|
| 12345| 1100| 100000|2020-06-02| 1100| buy|
| 12345| 500| 100000|2020-06-02| 500| sell|
| 12345| 200| 100000|2020-06-03| 200| buy|
| 12345| 4000| 100000|2020-06-04| 4000| buy|
+----------+------+-----------------+----------+------+----------------+
我验证了该列并检查其中是否存在任何 NULL 值并将其删除。在这里您可以看到上一个表中的第一行已被删除,因为它的 date_map 具有空值
+----------+------+-----------------+----------+------+----------------+
|account_id|amount|credit_card_limit| date_map|amount|transaction_code|
+----------+------+-----------------+----------+------+----------------+
| 12345| 1100| 100000|2020-06-02| 1100| buy|
| 12345| 500| 100000|2020-06-02| 500| sell|
| 12345| 200| 100000|2020-06-03| 200| buy|
| 12345| 4000| 100000|2020-06-04| 4000| buy|
+----------+------+-----------------+----------+------+----------------+
但是当我尝试计算费用列时,也考虑了删除行的金额,并计算了错误的金额。我不知道它是怎么发生的,因为我已经删除了该行。
下面是结果表,在这里您可以看到空行数量(1000)也从 10000 中减去,因此我们得到 98400 而不是 99400。
+----------+-----------------+----------+--------+-----------+
|account_id|credit_card_limit| date_map|expenses|credit_left|
+----------+-----------------+----------+--------+-----------+
| 12345| 100000|2020-06-02| -600| 98400|
| 12345| 100000|2020-06-03| -200| 97100|
| 12345| 100000|2020-06-04| -4000| 93100|
| 12345| 100000|2020-06-05| -900| 92200|
+----------+-----------------+----------+--------+-----------+
我的表的初始架构:
root
|-- account_id: long (nullable = true)
|-- credit_card_limit: long (nullable = true)
|-- credit_card_number: long (nullable = true)
|-- first_name: string (nullable = true)
|-- last_name: string (nullable = true)
|-- phone_number: long (nullable = true)
|-- amount: string (nullable = true)
|-- date: string (nullable = true)
|-- shop: string (nullable = true)
|-- transaction_code: string (nullable = true)
下面是我使用的代码:
spark=SparkSession.builder.master("local").appName("PPython").getOrCreate()
df=spark.read.json("C:\\Users\\Admin\\Desktop\\sample.json",multiLine=True)
#Exploding the transaction column
df=df.selectExpr("*","explode(transactions)").select("*","col.*").drop(*['col','transactions'])
df.printSchema()
#Changing the datatype of columns to our required dataset
df=df.withColumn('account_id',col('account_id').cast(LongType()))
df=df.withColumn('amount',col('amount').cast(LongType()))
df=df.withColumn('credit_card_limit',col('credit_card_limit').cast(LongType()))
df=df.withColumn('credit_card_number',col('credit_card_number').cast(LongType()))
df=df.withColumn('phone_number',col('phone_number').cast(LongType()))
df=df.withColumn('shop',col('shop').cast(StringType()))
df=df.withColumn('first_name',col('first_name').cast(StringType()))
df=df.withColumn('last_name',col('last_name').cast(StringType()))
df=df.withColumn('transaction_code',col('transaction_code').cast(StringType()))
df=df.withColumn('date',col('date').cast(StringType()))
df=df.filter(df.date.isNotNull())
#Cleaning and Validating based on date column
df=df.withColumn('full_name',f.concat(f.col('first_name'),f.lit(' '),f.col('last_name'))).drop('first_name','last_name')
df=df.withColumn("cleaned_map", regexp_replace("date", "[^0-9T]", "")).withColumn("date_map", to_date("cleaned_map", "ddMMyyyy")).drop('cleaned_map','date')
df.select('account_id','amount','credit_card_limit','date_map','amount','transaction_code').show(5)
finaldf=df.filter(df.date_map.isNotNull())
#Cleaning the data which has null values
finaldf=finaldf.filter(df.account_id.isNotNull())
finaldf=finaldf.filter(df.credit_card_number.isNotNull())
finaldf=finaldf.filter(df.transaction_code.isNotNull())
finaldf=finaldf.filter(df.amount.isNotNull())
finaldf =finaldf.fillna(0, subset=['amount'])
#Validate the columns if they have any additional characters,punctuations
finaldf=finaldf.withColumn('phone_number',regexp_replace("phone_number","[^0-9]",""))
finaldf=finaldf.withColumn('account_id',regexp_replace("account_id","[^0-9]",""))
finaldf=finaldf.withColumn('credit_card_limit',regexp_replace("credit_card_limit","[^0-9]",""))
finaldf=finaldf.withColumn('credit_card_number',regexp_replace("credit_card_number","[^0-9]",""))
finaldf=finaldf.withColumn('amount',regexp_replace("amount","[^0-9 ]",""))
finaldf=finaldf.withColumn('full_name',regexp_replace("full_name","[^a-zA-Z ]",""))
finaldf=finaldf.withColumn('transaction_code',regexp_replace("transaction_code","[^a-zA-Z]",""))
finaldf=finaldf.withColumn('shop',regexp_replace("shop","[^a-zA-Z ]",""))
finaldf=finaldf.withColumn('account_id',col('account_id').cast(LongType()))
finaldf=finaldf.withColumn('amount',col('amount').cast(LongType()))
finaldf=finaldf.withColumn('credit_card_limit',col('credit_card_limit').cast(LongType()))
finaldf=finaldf.withColumn('credit_card_number',col('credit_card_number').cast(LongType()))
finaldf=finaldf.withColumn('phone_number',col('phone_number').cast(LongType()))
#Making the first letter of the Names as Capital letters and also trimming the extra spaces present in the names
finaldf=finaldf.withColumn("full_name",initcap(col("full_name"))).withColumn("full_name",regexp_replace(col("full_name"),'\s+',' '))
#Masking the credit card number
finaldf=finaldf.withColumn("credit_card_number", expr("concat(translate(left(credit_card_number, 8), '0123456789', '**********'), " +
"right(credit_card_number, 4))"))
#Grouping the columns based on conditions
w = Window.partitionBy(f.lit(0)).orderBy('date_map')
finaldf.select('account_id','amount','credit_card_limit','date_map','amount','transaction_code').show(4)
finaldf=finaldf.groupBy('account_id','full_name','credit_card_limit','credit_card_number','date_map','phone_number').agg(f.sum(f.when(f.col('transaction_code')=='buy',-f.col('amount')).\
otherwise(f.col('amount'))).alias('expenses')).\
select('*',(f.col('credit_card_limit')+f.sum(f.col('expenses')).over(w)).alias('credit_left'))
#Arranging the table based on account_id,date
finaldf=finaldf.orderBy('account_id','date_map')
finaldf.drop('phone_number','full_name','credit_card_number').show(5)
所需输出:
+----------+-----------------+----------+--------+-----------+
|account_id|credit_card_limit| date_map|expenses|credit_left|
+----------+-----------------+----------+--------+-----------+
| 12345| 100000|2020-06-02| -600| 99400|
| 12345| 100000|2020-06-03| -200| 99200|
| 12345| 100000|2020-06-04| -4000| 95200|
+----------+-----------------+----------+--------+-----------+
如果我不删除任何行,则此代码可以正常工作,但在我删除行时无法正常工作。请帮助我找到此代码中的错误,非常感谢!
解决方案
推荐阅读
- bash - 一列中存在的匹配模式应替换为第二个文件的条目
- swiftui - SwiftUI 选项卡选择不适用于任何可散列的内容
- apache-spark - Aliasing different WINDOW clauses in Spark SQL
- c# - 撤销 Microsoft 端的 Web 应用程序
- node.js - 在 NestJs 中获取 Module 的元数据
- c++ - 单个 C++ 变量在调试器中有冲突的值?
- reactjs - 将人员链接到 Delve 个人资料页面
- node.js - 循环 console.log 很好,但只在文件 node.js 中添加一行
- c# - C# 语法高亮着色
- python - Python DataFrame:根据另一个df的条件更改df中一行的状态?