首页 > 解决方案 > 比较 PySpark 中的文件

问题描述

我正在使用 PySpark,我需要比较两个文件的内容 - 进行差异检查。所以我把我的测试分成了2个:

  1. 我在文件之间连接列,并检查记录是否相同
  2. 我从每个文件中获取唯一的列,并保存它们(手动测试)

第一步是通过采取

joined_columns = set(tbl1.columns).intersection(set(tbl2.columns))
joined_columns_str = str(joined_columns)[1:-1].replace("'", "")

tbl1_set = spark.sql("SELECT " + joined_columns_str + " FROM tbl1")
tbl2_set = spark.sql("SELECT " + joined_columns_str + " FROM tbl2")

sql = "SELECT * FROM tbl1_set " \
          "EXCEPT " \
          "SELECT * FROM tbl2_set "
different_records = spark.sql(sql)

但是,在这些文件中我有地图列 - 所以我收到了这个错误:

org.apache.spark.sql.AnalysisException: Cannot have map type columns in DataFrame which calls set operations(intersect, except, etc.)

有谁知道如何解决它并只获得不同的记录?我考虑过使用 udf,但找不到如何在“select *”上使用它,只有在引用特定字段时...

任何帮助将非常感激。

谢谢!

标签: python-3.xapache-sparkpysparkuser-defined-functionsdifference

解决方案


推荐阅读