java - How to decode a byte[] of List to Dataset in spark?
问题描述
Me using spark-sql-2.3.1v , kafka with java8 in my project. I am trying to convert topic received byte[] to Dataset at kafka consumer side.
Here are the details
I have
class Company{
String companyName;
Integer companyId;
}
Which I defined as
public static final StructType companySchema = new StructType(
.add("companyName", DataTypes.StringType)
.add("companyId", DataTypes.IntegerType);
But message defined as
class Message{
private List<Company> companyList;
private String messageId;
}
I tried to define as
StructType messageSchema = new StructType()
.add("companyList", DataTypes.createArrayType(companySchema , false),false)
.add("messageId", DataTypes.StringType);
I sent the Message to kafka topic as byte[] using serialization .
I successfully received the message byte [] at consumer . Which I am trying to convert as Dataset ?? how to do it ?
Dataset<Row> messagesDs = kafkaReceivedStreamDs.select(from_json(col("value").cast("string"), messageSchema ).as("messages")).select("messages.*");
messagesDs.printSchema();
root
|-- companyList: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- companyName: string (nullable = true)
| | |-- companyId: integer (nullable = true)
|-- messageId: string (nullable = true)
Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList")));
comapanyListDs.printSchema();
root
|-- col: struct (nullable = true)
| |-- companyName: string (nullable = true)
| |-- companyId: integer (nullable = true)
Dataset<Company> comapanyDs = comapanyListDs.as(Encoders.bean(Company.class));
Getting Error :
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'companyName
' given input columns: [col];
How to get Dataset records , how to get it ?
解决方案
Your struct got named with "col" when exploding.
Since your Bean class doesn't have "col" attribute, it is failing with mentioned error.
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'companyName' given input columns: [col];
You can do following select to get relevant columns as plain column: Something like this:
Dataset<Row> comapanyListDs = messagesDs.select(explode_outer(col("companyList"))).
select(col("col.companyName").as("companyName"),col("col.companyId").as("companyId"));
I haven't tested syntax but must work your next step as soon as you get plain columns from struct for each row.
推荐阅读
- python - 如果我使用 pandas 对同一项目同时拥有 True 和 False 行,则获取 True 行
- android - 如果 WebView 遵循替代链接,则返回时会进入无限循环
- python - 如何根据 PID 获取正在运行的 IE 实例
- javascript - phpmyadmin+mysql 实时拉取数据
- java - byte[] 可以转换成连续的输入流
- angular - Ngx-Indexed-db - 数据未保存在 Microsoft Edge 浏览器中,引发数据错误
- python - 如果未找到元素,脚本将不起作用
- c# - 流未打开以供阅读
- php - Sendinblue SMTP 在 CodeIgniter 中不起作用
- regex - jflex正则表达式除零以外的任何数字