apache-spark - 数据集上的过滤操作失败,出现熟悉的错误“AnalysisException:无法解析给定的输入列”
问题描述
我正在使用 Kafka 2.3.0 和 Spark 2.3.4。我正在尝试通过在其上运行过滤器来操作数据集。但是我得到分析异常错误并且无法找出解决方案。请注意,上面的 POJO 数据集本身很好,并且可以在控制台上很好地打印。代码是这个的延续。请注意,POJO 数据集是由通过 Kafka 传入的流数据制成的。
查看列名以查找不匹配(如果有),尝试使用 lambdas 和 sql 过滤器语句的变体。我认为我在理解方面遗漏了一些东西以使其发挥作用。
这是 POJO 类:
public class Pojoclass2 implements Serializable {
private java.sql.Date dt;
private String ct;
private String r;
private String b;
private String s;
private Integer iid;
private String iname;
private Integer icatid;
private String cat;
private Integer rvee;
private Integer icee;
private Integer opcode;
private String optype;
private String opname;
public Pojoclass2 (){}
...
//getters and setters
}
//What works (dataAsSchema2 is a Dataset<Row> formed out of incoming streaming data of a kafka topic):
Encoder<Pojoclass2> encoder = Encoders.bean(Pojoclass2.class);
Dataset<Pojoclass2> se= new Dataset<Pojoclass2>(sparkSession,
dataAsSchema2.logicalPlan(), encoder);
//I can print se on a console sink and it is all good. I can do all filtering on se but can only receive the return value as Dataset<Row>.
//What doesnt work(it compiles but throws the analysis exception at runtime:
Dataset<Pojoclass2> h = se
.filter((FilterFunction<Pojoclass2>) s -> s.getBuyerName() == "ASD");
//or
Dataset<Pojoclass2> h = se
.filter((FilterFunction<Pojoclass2>) s -> s.getBuyerName() == "ASD").as(Encoders.bean(Pojoclass2.class));
还有错误跟踪(注意这是实际的。在 Pojoclass2 中,我更改了属性名称以保护机密性。您可能会看到名称不同,类型匹配):
"
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`contactRole`' given input columns: [iname, ct, icatid, s, r, b, opname, cat, opcode, dt, iid, icee, optype, rvee];;
'TypedFilter ibs.someengine.spark.somecore.SomeMain$$Lambda$17/902556500@23f8036d,
...
...
"
我希望过滤器应该正常运行,并且 h 应该包含过滤后的强类型行。目前,我正在通过将其转换为 DataFrame ( Dataset<Row>
) 来工作,但这有点违背了目的(我猜)。我还注意到,似乎支持通过 bean 类进行操作的强类型数据集上的操作很少。这是一个有效的理解吗?
谢谢!
解决方案
推荐阅读
- javascript - 如何在 ejs 中有条件地包含视图?
- jquery - Bootstrap datetimepicker - 无法触发事件
- python - 在 PyCharm 中运行时没有名为“googleapiclient”的模块
- amazon-web-services - 在 AWS CodeBuild 中运行容器的权限错误
- sql - 带有“间隔”的 JPA Hibernate 查询的问题
- java - 为什么 size 和 preferredSize 不让这个标签变大?
- python - 在 Google 表格中创建空数据框
- passport.js - 在 IOS 中从 Facebook 应用程序使用 Passportjs Facebook 时会话过期
- python-3.x - 将多个时间序列从单个列中分离出来以分离每个系列的列?
- java - 我可以为通用函数提供正确的类型信息吗?