java - 如何使用 Spark 结构化流实现 Kafka 流的自定义反序列化器?
问题描述
我正在尝试迁移我当前的流式应用程序,该应用程序基于使用 RDD(来自他们的文档)到使用结构化流式传输的新 Datasets API,据我所知,这是目前使用 Spark 进行实时流式传输的首选方法。
目前,我有应用程序设置来使用 1 个名为“SATELLITE”的主题,该主题包含包含键时间戳和包含Satellite
POJO 的值的消息。但是我在弄清楚如何为此实现反序列化器时遇到问题。在我当前的应用程序中,这很容易,您只需在您喜欢的 kafka 属性映射中添加一行kafkaParams.put("value.deserializer", SatelliteMessageDeserializer.class);
我正在用 Java 执行此操作,这是最大的挑战,因为所有解决方案似乎都在 Scala 中,而我没有理解得很好,我不容易将 Scala 代码转换为 Java 代码。
我遵循了这个问题中概述的 JSON 示例,该示例目前有效,但对于我需要做的事情来说似乎过于复杂。鉴于我已经为此目的制作了自定义反序列化器,我不明白为什么我必须先将其转换为字符串,而只是将其转换为 JSON,然后再将其转换为我想要的类类型。我也一直在尝试使用我在这里找到的一些示例,但到目前为止我还没有运气。
目前我的应用程序看起来像这样(使用 json 方法):
import common.model.Satellite;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
public class SparkStructuredStreaming implements Runnable{
private String bootstrapServers;
private SparkSession session;
public SparkStructuredStreaming(final String bootstrapServers, final SparkSession session) {
this.bootstrapServers = bootstrapServers;
this.session = session;
}
@Override
public void run() {
Dataset<Row> df = session
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", "SATELLITE")
.load();
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("id", DataTypes.StringType, true),
DataTypes.createStructField("gms", DataTypes.StringType, true),
DataTypes.createStructField("satelliteId", DataTypes.StringType, true),
DataTypes.createStructField("signalId", DataTypes.StringType, true),
DataTypes.createStructField("cnr", DataTypes.DoubleType, true),
DataTypes.createStructField("constellation", DataTypes.StringType, true),
DataTypes.createStructField("timestamp", DataTypes.TimestampType, true),
DataTypes.createStructField("mountPoint", DataTypes.StringType, true),
DataTypes.createStructField("pseudorange", DataTypes.DoubleType, true),
DataTypes.createStructField("epochTime", DataTypes.IntegerType, true)
});
Dataset<Satellite> df1 = df.selectExpr("CAST(value AS STRING) as message")
.select(functions.from_json(functions.col("message"),schema).as("json"))
.select("json.*")
.as(Encoders.bean(Satellite.class));
try {
df1.writeStream()
.format("console")
.option("truncate", "false")
.start()
.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
}
}
}
我有一个看起来像这样的自定义反序列化器
import common.model.Satellite;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class SatelliteMessageDeserializer implements Deserializer<Satellite> {
private static Logger logger = LoggerFactory.getLogger(SatelliteMessageDeserializer.class);
private ObjectMapper objectMapper = new ObjectMapper();
@Override
public void configure(Map configs, boolean isKey) {
}
@Override
public void close() {
}
@Override
public Satellite deserialize(String topic, byte[] data) {
try {
return objectMapper.readValue(new String(data, "UTF-8"), getMessageClass());
} catch (Exception e) {
logger.error("Unable to deserialize message {}", data, e);
return null;
}
}
protected Class<Satellite> getMessageClass() {
return Satellite.class;
}
}
如何在SparkStructuredStreaming
课堂上使用我的自定义反序列化器?我正在使用 Spark 2.4、OpenJDK 10 和 Kafka 2.0
编辑:我尝试创建自己的 UDF,我认为这是应该如何完成的,但我不确定如何让它返回特定类型,因为它似乎只允许我使用那些上课Datatypes
!
UserDefinedFunction mode = udf(
(byte[] bytes) -> deserializer.deserialize("", bytes), DataTypes.BinaryType //Needs to be type Satellite, but only allows ones of type DataTypes
);
Dataset df1 = df.select(mode.apply(col("value")));
解决方案
from_json
只能在字符串类型的列上工作。
值始终使用 ByteArrayDeserializer 反序列化为字节数组。使用 DataFrame 操作显式反序列化值
因此,您首先至少要反序列化为字符串,但我认为您并不真正需要它。
可能只是这样做
df.select(value).as(Encoders.bean(Satellite.class))
如果这不起作用,您可以尝试定义自己的 UDF/解码器,这样您就可以拥有类似的东西SATELLITE_DECODE(value)
在斯卡拉
object SatelliteDeserializerWrapper {
val deser = new SatelliteDeserializer
}
spark.udf.register("SATELLITE_DECODE", (topic: String, bytes: Array[Byte]) =>
SatelliteDeserializerWrapper.deser.deserialize(topic, bytes)
)
df.selectExpr("""SATELLITE_DECODE("topic1", value) AS message""")
请参阅这篇文章以获取灵感,并且在 Databricks 博客中也提到过
推荐阅读
- java - 如何在 Selenium Java Webdriver 中获取要在运行时验证的文本的类名
- oracle - 避免重复 oracle aapex
- java - Liferay 门户:使用 JSF 和 EJB-CDI 的 OSGI 模块化不使用 @Resource 注入/检索
- java - 使用 onPreviewFrame 运行 ML 模型
- javascript - 使用 Angular6 Service 构造函数初始化私有属性
- java - 在 Springboot 中修剪 @RequestParam
- python - 如何将指数平滑模型预测值获取到 POWER BI/POWER Query 数据集?
- python - 使用 Python 子进程调用带有空格分隔列表的 Python 命令
- arrays - 使用解码器解析嵌套的 json 文件
- spring - 创建 Maven 子模块时忽略 Application.properties