首页 > 解决方案 > 从 PySpark 返回复杂类型

问题描述

我正在尝试从 Grouped Map Pandas UDF 返回值矩阵。

在定义模式时,我给出了 Array(Array(DoubleType())),但这会导致只返回矩阵的第一行,而所有其他值都为 None。

这是我尝试过的:

myschema = StructType([
  StructField('my_id',  StringType()),
  StructField('matrix', ArrayType(ArrayType(DoubleType())) )
]);
cols = list(map(lambda s: s.__dict__['name'], myschema))

@F.pandas_udf(myschema, F.PandasUDFType.GROUPED_MAP)
def my_function(data):
  myID = 'Hello'

  matrix = [
    [1.1, 2.2, 3.3],
    [4.4, 5.5, 6.6],
    [7.7, 8.8, 9.9]
  ]

  return pd.DataFrame([[ myID, matrix ]], columns=cols)

df = spark.createDataFrame(pd.DataFrame(['id1', 'id2'], columns=['ID']))
df.groupBy('ID').apply(my_function).collect()

结果,我有:

行(my_id=u'Hello', 矩阵=[[1.1, 2.2, 3.3], 无, 无])

当然,返回一个非嵌套数组没有任何问题。

myschema = StructType([
  StructField('my_id',  StringType()),
  StructField('matrix', ArrayType(DoubleType()))
]);
cols = list(map(lambda s: s.__dict__['name'], myschema))

@F.pandas_udf(myschema, F.PandasUDFType.GROUPED_MAP)
def my_function(data):
  myID = 'Hello'

  matrix = [1.1, 2.2, 3.3]

  return pd.DataFrame([[ myID, matrix ]], columns=cols)

df = spark.createDataFrame(pd.DataFrame(['id1', 'id2'], columns=['ID']))
df.groupBy('ID').apply(my_function).collect()

结果:

行(my_id=u'Hello', 矩阵=[1.1, 2.2, 3.3])

架构有问题吗?

标签: pythonapache-sparkpysparkuser-defined-functionsdatabricks

解决方案


推荐阅读