java - 如何在生产者中发送 java 对象作为值?
问题描述
是否可以将 Java 对象作为 Kafka 主题中的值发送,以及如何在 spark 中使用它?
我目前正在做 apache-spark 教程,想知道是否可以发送除字符串之外的其他内容。本教程有这个例子
producer.send(new ProducerRecord<String, String>(topic, something_string));
有可能做这样的事情吗?
Car car = new Car(brand, year, color);
producer.send(new ProducerRecord<String, Car>(topic, car));
我以后如何在 Spark 中使用它?
目前我正在这样做:
String car = brand + "," + year + "," + color;
producer.send(new ProducerRecord<String, String>(topic, car));
我把所有东西都放在一个用逗号分隔的字符串中。
问题2:目前我是这样消费的。
Dataset<String> words = df
.selectExpr("CAST (value AS STRING)")
.as(Encoders.STRING());
我在哪里得到字符串:
"brand,year,color"
如何将其拆分并放在单独的列中?
解决方案
您的帖子实际上有两个问题,您可以将它们分成单独的帖子。对于第一个问题,请参阅此帖子;中心概念是您必须编写自定义序列化程序。
对于第二个,这个概念在原理上仍然是相同的,但是这次你必须在 Spark 端编写一个自定义的解串器(解码器)。请参阅此 Spark 文档,它演示了如何从 Kafka 创建流。但是,请不要使用 'KafkaUtil' 类,请参阅javadoc。它具有使用 Kafka 解码器类创建流的方法。
推荐阅读
- c# - 如何获取 rasdial API 的 PPP cookie?
- php - Stripe:如何在 php 中从 Charge API 迁移到 PaymentIntent API
- excel - 在不清除过滤器的情况下设置 Excel 范围的值
- python - 在远程桌面上使用 PyAutoGUI 的鼠标单击问题
- java - 空手道 0.9.5.RC5 -“重试直到”功能不知何故不尊重我在 karate-config.js 中的“重试”配置
- r - 清理具有重复但不同因子级别的起点和终点数据
- ruby - 如何使用代理运行 ruby 脚本?
- r - 将一个集合拆分为 n 个不相等的子集,关键决定因素是子集中的元素聚合并等于预定数量?
- excel - 将特定单词表的内容复制到excel
- python - 如何用 Pandas 总结某些行并将结果添加到 defaultdict(大型数据集)