python - 尝试应用 lambda 来创建新列时,“'DataFrame' 对象没有属性 'apply'”
问题描述
我的目标是在 Pandas DataFrame 中添加一个新列,但我遇到了一个奇怪的错误。
新列预计将是现有列的转换,可以通过在字典/哈希图中进行查找来完成。
# Loading data
df = sqlContext.read.format(...).load(train_df_path)
# Instanciating the map
some_map = {
'a': 0,
'b': 1,
'c': 1,
}
# Creating a new column using the map
df['new_column'] = df.apply(lambda row: some_map(row.some_column_name), axis=1)
这导致以下错误:
AttributeErrorTraceback (most recent call last)
<ipython-input-12-aeee412b10bf> in <module>()
25 df= train_df
26
---> 27 df['new_column'] = df.apply(lambda row: some_map(row.some_column_name), axis=1)
/usr/lib/spark/python/pyspark/sql/dataframe.py in __getattr__(self, name)
962 if name not in self.columns:
963 raise AttributeError(
--> 964 "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
965 jc = self._jdf.apply(name)
966 return Column(jc)
AttributeError: 'DataFrame' object has no attribute 'apply'
其他可能有用的信息: * 我正在使用 Spark 和 Python 2。
解决方案
您使用的语法适用于pandas
DataFrame。要为spark
DataFrame 实现这一点,您应该使用该withColumn()
方法。这适用于各种定义良好的DataFrame 函数,但对于用户定义的映射函数来说有点复杂。
一般情况
为了定义 a udf
,您需要指定输出数据类型。例如,如果您想应用一个my_func
返回 a的函数,您可以按如下string
方式创建 a :udf
import pyspark.sql.functions as f
my_udf = f.udf(my_func, StringType())
然后您可以使用my_udf
创建一个新列,例如:
df = df.withColumn('new_column', my_udf(f.col("some_column_name")))
另一种选择是使用select
:
df = df.select("*", my_udf(f.col("some_column_name")).alias("new_column"))
具体问题
用一个udf
在您的特定情况下,您希望使用字典来翻译 DataFrame 的值。
这是一种udf
为此目的定义 a 的方法:
some_map_udf = f.udf(lambda x: some_map.get(x, None), IntegerType())
请注意,我使用它dict.get()
是因为您希望您udf
对不良输入具有鲁棒性。
df = df.withColumn('new_column', some_map_udf(f.col("some_column_name")))
使用 DataFrame 函数
有时使用 audf
是不可避免的,但只要有可能,通常首选使用 DataFrame 函数。
这是一种不使用udf
.
诀窍是迭代其中的项目some_map
以创建pyspark.sql.functions.when()
函数列表。
some_map_func = [f.when(f.col("some_column_name") == k, v) for k, v in some_map.items()]
print(some_map_func)
#[Column<CASE WHEN (some_column_name = a) THEN 0 END>,
# Column<CASE WHEN (some_column_name = c) THEN 1 END>,
# Column<CASE WHEN (some_column_name = b) THEN 1 END>]
现在您可以pyspark.sql.functions.coalesce()
在选择内部使用:
df = df.select("*", f.coalesce(*some_map_func).alias("some_column_name"))
这是有效的,因为如果不满足条件,则默认when()
返回,并将选择它遇到的第一个非空值。由于地图的键是唯一的,因此最多有一列是非空的。null
coalesce()
推荐阅读
- python - 无法在 anaconda 中安装 prettytable
- android - 无法通过 ADB 连接到设备的端口
- angularjs - 如何使用 angularjs 制作动态 ng-repeat?
- javascript - GTM 自定义 Javascript 变量中的解析错误
- php - PHP 使用外部磁盘驱动器下载文件
- xml - Office 2016 静默卸载提示:“准备好卸载了吗?”
- java - Java 中的 ExecuteQuery() 与 getResultSet()
- apache-kafka - 如何在芭蕾舞女演员服务 api 中执行一些 kafka 命令
- jsf - JSF 自定义组件
- c# - 如何执行具有接口的 C# 文件