java - 如何使用 Spark StructuredStream API 从 kafka 反序列化 java 对象,没有 Avro 或 Json?
问题描述
我在 kafka 中创建了一个名为“test”的主题,它只有一个分区并且没有被复制。
我创建了一个 Kafka 生产者,它在主题“测试”上写了一个键,即每个循环迭代的索引,以及一个值,它是 100000 次迭代循环中的 Word 类型对象。每个单词的属性“word”设置为迭代索引,每个单词的属性“timestamp”设置为当前时间戳。迭代之间有 1000 毫秒的睡眠 (Thread.sleep)。Word的定义如下:
public class Word implements Serializable{
private static final long serialVersionUID = 1L;
private String word;
private Long timestamp;
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
@Override
public String toString() {
return "Word [word=" + word + ", timestamp=" + timestamp + "]";
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((timestamp == null) ? 0 : timestamp.hashCode());
result = prime * result + ((word == null) ? 0 : word.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Word other = (Word) obj;
if (timestamp == null) {
if (other.timestamp != null)
return false;
} else if (!timestamp.equals(other.timestamp))
return false;
if (word == null) {
if (other.word != null)
return false;
} else if (!word.equals(other.word))
return false;
return true;
}
}
以下是使用 StructuredStreaming API 的类。如您所见,我注册了一个名为“deserialize”的反序列化器类,然后我使用该类来转换从 kafka 构建的数据帧 df 的列。我将省略 Deserializer 类的详细信息,因为正如您在堆栈跟踪中看到的那样,该类已正确反序列化。关键是,在反序列化之后,转换为 Word 模式会失败。
public class StreamObjsFromKafka {
public static void es() throws StreamingQueryException {
SparkSession session = SparkSession.builder().appName("streamFromKafka").master("local[*]").getOrCreate();
Dataset<Row> df = session.readStream().format("kafka")
.option("group.id","test-consumer-group")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test").load();
df.printSchema();
StructField word = DataTypes.createStructField("word", DataTypes.StringType, true);
StructField timestamp = DataTypes.createStructField("timestamp", DataTypes.LongType, true);
StructType schema = DataTypes.createStructType(Arrays.asList(word, timestamp));
session.udf().register("deserialize", new WordDeserializer(), schema);
Dataset<Row> df1 = df.selectExpr("CAST(key AS STRING) as KEY", "deserialize(value) AS value");
StreamingQuery query0 = df1.writeStream().outputMode("update").format("console").start();
query0.awaitTermination();
}
}
以下几行是堆栈跟踪的第一行(我会为您省去无用的细节):
18/12/06 15:21:03 ERROR Utils: Aborting task
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$259: (binary) => struct<word:string,timestamp:bigint>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: The value (Word [word=15, timestamp=1544106059933]) of the type (kafka.model.Word) cannot be converted to struct<word:string,timestamp:bigint>
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
... 17 more
18/12/06 15:21:03 ERROR DataWritingSparkTask: Aborting commit for partition 0 (task 0, attempt 0stage 0.0)
18/12/06 15:21:03 ERROR DataWritingSparkTask: Aborted commit for partition 0 (task 0, attempt 0stage 0.0)
18/12/06 15:21:03 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$259: (binary) => struct<word:string,timestamp:bigint>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: The value (Word [word=15, timestamp=1544106059933]) of the type (kafka.model.Word) cannot be converted to struct<word:string,timestamp:bigint>
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
... 17 more
18/12/06 15:21:03 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$259: (binary) => struct<word:string,timestamp:bigint>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: The value (Word [word=15, timestamp=1544106059933]) of the type (kafka.model.Word) cannot be converted to struct<word:string,timestamp:bigint>
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:262)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:238)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
... 17 more
我认为堆栈跟踪中最重要的一行如下:
The value (Word [word=15, timestamp=1544106059933]) of the type (kafka.model.Word) cannot be converted to struct<word:string,timestamp:bigint>
正如您在上面的行中所看到的,反序列化工作正常:“(Word [word = 15,timestamp = 1544106059933])”正是Kafka Producer写给“测试”主题的内容。但是,它不会将其属性转换为以下 StructType 架构:“struct”。我不明白为什么。
解决方案
推荐阅读
- java - 为什么 BigDecimal 刻度在 Java 中不起作用?
- tensorflow - 使用 VGG19 从任意中间层提取特征
- laravel - 属性或方法“时刻”未在实例上定义,但在渲染期间被引用。确保此属性
- html - 如何在浏览器/HTML 文件中使用 fs?
- c# - 根据 url 参数选择身份方案
- java - 使java检查数组相等
- c++ - 如何将数组中的分区从一个点交换到另一个点
- jquery - Jquery在下拉更改时更改表单操作
- arrays - 如何在没有 qsort() 的情况下使用指针(不是数组索引)对数组或结构进行排序?
- html - 在 SVG 元素中,视图框单位是否与宽度等单位相同?