首页 > 解决方案 > 数据集上的过滤操作失败,出现熟悉的错误“AnalysisException:无法解析给定的输入列”

问题描述

我正在使用 Kafka 2.3.0 和 Spark 2.3.4。我正在尝试通过在其上运行过滤器来操作数据集。但是我得到分析异常错误并且无法找出解决方案。请注意,上面的 POJO 数据集本身很好,并且可以在控制台上很好地打印。代码是这个的延续。请注意,POJO 数据集是由通过 Kafka 传入的流数据制成的。

查看列名以查找不匹配(如果有),尝试使用 lambdas 和 sql 过滤器语句的变体。我认为我在理解方面遗漏了一些东西以使其发挥作用。

这是 POJO 类:

public class Pojoclass2 implements Serializable {

    private java.sql.Date dt;
    private String ct;
    private String r;
    private String b;
    private String s;
    private Integer iid;
    private String iname;
    private Integer icatid;
    private String cat;
    private Integer rvee;
    private Integer icee;
    private Integer opcode;
    private String optype;
    private String opname;

    public Pojoclass2 (){}
...
//getters and setters
}

//What works (dataAsSchema2 is a Dataset<Row> formed out of incoming streaming data of a kafka topic):

Encoder<Pojoclass2> encoder = Encoders.bean(Pojoclass2.class);
Dataset<Pojoclass2> se= new Dataset<Pojoclass2>(sparkSession,
                    dataAsSchema2.logicalPlan(), encoder);
//I can print se on a console sink and it is all good. I can do all filtering on se but can only receive the return value as Dataset<Row>.

//What doesnt work(it compiles but throws the analysis exception at runtime:

Dataset<Pojoclass2> h = se
                    .filter((FilterFunction<Pojoclass2>) s -> s.getBuyerName() == "ASD");

//or
Dataset<Pojoclass2> h = se
                    .filter((FilterFunction<Pojoclass2>) s -> s.getBuyerName() == "ASD").as(Encoders.bean(Pojoclass2.class));

还有错误跟踪(注意这是实际的。在 Pojoclass2 中,我更改了属性名称以保护机密性。您可能会看到名称不同,类型匹配):

"
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`contactRole`' given input columns: [iname, ct, icatid, s, r, b, opname, cat, opcode, dt, iid, icee, optype, rvee];;
'TypedFilter ibs.someengine.spark.somecore.SomeMain$$Lambda$17/902556500@23f8036d, 
...
...
"

我希望过滤器应该正常运行,并且 h 应该包含过滤后的强类型行。目前,我正在通过将其转换为 DataFrame ( Dataset<Row>) 来工作,但这有点违背了目的(我猜)。我还注意到,似乎支持通过 bean 类进行操作的强类型数据集上的操作很少。这是一个有效的理解吗?

谢谢!

标签: apache-sparkapache-spark-sqlapache-spark-dataset

解决方案


推荐阅读