首页 > 解决方案 > 如何在生产者中发送 java 对象作为值?

问题描述

是否可以将 Java 对象作为 Kafka 主题中的值发送,以及如何在 spark 中使用它?

我目前正在做 apache-spark 教程,想知道是否可以发送除字符串之外的其他内容。本教程有这个例子

producer.send(new ProducerRecord<String, String>(topic, something_string));

有可能做这样的事情吗?

Car car = new Car(brand, year, color);
producer.send(new ProducerRecord<String, Car>(topic, car));

我以后如何在 Spark 中使用它?

目前我正在这样做:

String car = brand + "," + year + "," + color;
producer.send(new ProducerRecord<String, String>(topic, car));

我把所有东西都放在一个用逗号分隔的字符串中。

问题2:目前我是这样消费的。

Dataset<String> words = df
.selectExpr("CAST (value AS STRING)")
.as(Encoders.STRING());

我在哪里得到字符串: "brand,year,color"

如何将其拆分并放在单独的列中?

标签: javaapache-sparkapache-kafkaproducer-consumerspark-structured-streaming

解决方案


您的帖子实际上有两个问题,您可以将它们分成单独的帖子。对于第一个问题,请参阅此帖子;中心概念是您必须编写自定义序列化程序。

对于第二个,这个概念在原理上仍然是相同的,但是这次你必须在 Spark 端编写一个自定义的解串器(解码器)。请参阅此 Spark 文档,它演示了如何从 Kafka 创建流。但是,请不要使用 'KafkaUtil' 类,请参阅javadoc。它具有使用 Kafka 解码器类创建流的方法。


推荐阅读