python - 在 Pyspark 中使用 udf 时出现 __getnewargs__ 错误
问题描述
有一个包含 2 列(db 和 tb)的数据框:db 代表数据库,tb 代表该数据库的 tableName。
+--------------------+--------------------+
| database| tableName|
+--------------------+--------------------+
|aaaaaaaaaaaaaaaaa...| tttttttttttttttt|
|bbbbbbbbbbbbbbbbb...| rrrrrrrrrrrrrrrr|
|aaaaaaaaaaaaaaaaa...| ssssssssssssssssss|
我在python中有以下方法:
def _get_tb_db(db, tb):
df = spark.sql("select * from {}.{}".format(db, tb))
return df.dtypes
这个udf:
test = udf(lambda db, tb: _get_tb_db(db, tb), StringType())
运行时:
df = df.withColumn("dtype", test(col("db"), col("tb")))
有以下错误:
pickle.PicklingError: Could not serialize object: Py4JError: An
error occurred while calling o58.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
我发现了一些关于 stackoverflow 的讨论:Spark __getnewargs__ 错误 ,但我不确定如何解决这个问题?错误是因为我在 UDF 中创建了另一个数据框吗?
类似于链接中的解决方案,我尝试了这个:
cols = copy.deepcopy(df.columns)
df = df.withColumn("dtype", scanning(cols[0], cols[1]))
但仍然出现错误
有什么解决办法吗?
解决方案
该错误意味着您不能在 UDF 中使用Spark 数据帧。但是由于您包含数据库和表名称的数据框很可能很小,因此只需使用 Pythonfor
循环就足够了,以下是一些可能有助于获取数据的方法:
from pyspark.sql import Row
# assume dfs is the df containing database names and table names
dfs.printSchema()
root
|-- database: string (nullable = true)
|-- tableName: string (nullable = true)
方法一:使用 df.dtypes
运行 sqlselect * from database.tableName limit 1
生成 df 并返回其 dtypes,将其转换为 StringType()。
data = []
DRow = Row('database', 'tableName', 'dtypes')
for row in dfs.collect():
try:
dtypes = spark.sql('select * from `{}`.`{}` limit 1'.format(row.database, row.tableName)).dtypes
data.append(DRow(row.database, row.tableName, str(dtypes)))
except Exception, e:
print("ERROR from {}.{}: [{}]".format(row.database, row.tableName, e))
pass
df_dtypes = spark.createDataFrame(data)
# DataFrame[database: string, tableName: string, dtypes: string]
笔记:
使用
dtypes
而不是str(dtypes)
将分别获得以下模式 where_1
和_2
arecol_name
和col_dtype
:root |-- database: string (nullable = true) |-- tableName: string (nullable = true) |-- dtypes: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- _1: string (nullable = true) | | |-- _2: string (nullable = true)
使用这种方法,每个表将只有一行。对于接下来的两种方法,表的每个 col_type 都会有自己的行。
方法二:使用描述
您还可以通过运行spark.sql("describe tableName")
直接获取数据帧来检索此信息,然后使用 reduce 函数来合并所有表的结果。
from functools import reduce
def get_df_dtypes(db, tb):
try:
return spark.sql('desc `{}`.`{}`'.format(db, tb)) \
.selectExpr(
'"{}" as `database`'.format(db)
, '"{}" as `tableName`'.format(tb)
, 'col_name'
, 'data_type')
except Exception, e:
print("ERROR from {}.{}: [{}]".format(db, tb, e))
pass
# an example table:
get_df_dtypes('default', 'tbl_df1').show()
+--------+---------+--------+--------------------+
|database|tableName|col_name| data_type|
+--------+---------+--------+--------------------+
| default| tbl_df1| array_b|array<struct<a:st...|
| default| tbl_df1| array_d| array<string>|
| default| tbl_df1|struct_c|struct<a:double,b...|
+--------+---------+--------+--------------------+
# use reduce function to union all tables into one df
df_dtypes = reduce(lambda d1, d2: d1.union(d2), [ get_df_dtypes(row.database, row.tableName) for row in dfs.collect() ])
方法 3:使用 spark.catalog.listColumns()
使用 spark.catalog.listColumns() 创建collections.Column
对象列表,检索name
和dataType
合并数据。生成的数据框在它们自己的列上使用 col_name 和 col_dtype 进行标准化(与使用Method-2相同)。
data = []
DRow = Row('database', 'tableName', 'col_name', 'col_dtype')
for row in dfs.select('database', 'tableName').collect():
try:
for col in spark.catalog.listColumns(row.tableName, row.database):
data.append(DRow(row.database, row.tableName, col.name, col.dataType))
except Exception, e:
print("ERROR from {}.{}: [{}]".format(row.database, row.tableName, e))
pass
df_dtypes = spark.createDataFrame(data)
# DataFrame[database: string, tableName: string, col_name: string, col_dtype: string]
注意:在检索元数据时,不同的 Spark 发行版/版本可能会产生不同的结果describe tbl_name
和其他命令,请确保在查询中使用正确的列名。
推荐阅读
- powerbi-desktop - 安装后无法打开 Microsoft Power BI Desktop
- java - 放弃将数据连接到交易视图
- python - Python列表处理
- reactjs - 按钮单击未在 React 上呈现输入字段
- javascript - 使用 Javascript / Node Js 拆分匹配数组和不匹配数组
- react-native - 如何渲染
- 使用 react-native-render-html 进行适当的缩进和标签?
- c++ - Arduino - GUISlice - gslc_ElemSetTxtStr 不更新文本
- geospatial - 如何在 python 中的 GeoTiff 图像 (epsg:32643) 上找到以 lat/long 指定的位置?
- java - Java中的数组声明和初始化
- image - 在 ImageJ 中编写宏以打开、更改颜色、调整亮度和重新保存显微镜图像