首页 > 解决方案 > 如何在 Spark 代码中用 Java8 编写 mapFunction?错误:map(Function1, Encoder) 不适用于参数

问题描述

我有一个 mapFunction 如下

Function1<CompanyInfoRecordNew, CompanyInfoTransformedRecord> mapFunction = ( record ) ->{

        CompanyInfoTransformedRecord  transRec = new CompanyInfoTransformedRecord();
        //popluate 
    return transRec ;
    };

它采用 CompanyInfoRecordNew 对象并返回 CompanyInfoTransformedRecord。

但是在如下调用它时

JavaRDD companyInfoTransformedRecord = baseDs.map(mapFunction, comanyInfoTransEncoder);

给出错误:

Dataset 类型中的方法 map(Function1, Encoder) 不适用于参数 (Function1, Encoder)

这里有什么问题?

相同的代码链接:

https://gist.github.com/BdLearnerr/cbfea1c8471557bb33449f882cc1854a

如何从地图函数返回列表?

链接中更新的代码

https://gist.github.com/BdLearnerr/cbfea1c8471557bb33449f882cc1854a

第 2 节:

MapFunction<CompanyInfoRecordNew, List<CompanyInfoTransformedRecord>>  mapFunction = ( record ) ->{

            List<CompanyInfoTransformedRecord>  transRecList = new ArrayList<CompanyInfoTransformedRecord>();

return transRecList ;
}


   Dataset<List<CompanyInfoTransformedRecord>> companyInfoTransformedRecords = baseDs.map(mapFunction, comanyInfoTransEncoder);

//错误

Dataset 类型中的方法 map(Function1, Encoder) 不适用于参数 MapFunction>, Encoder)

companyInfoTransformedRecord.show();

由于错误无法显示,如何让 show() 工作。??

标签: apache-sparkjava-8apache-spark-sqlspark-streaming

解决方案


您的导入是错误的,而不是:

import org.apache.calcite.linq4j.function.Function1;

用这个

org.apache.spark.api.java.function.MapFunction;

baseDs.map(new MapFunction<CompanyInfoRecordNew, CompanyInfoTransformedRecord>() {...}, encoder);

推荐阅读