apache-kafka - Apache Heron 中的 Kafka 集成
问题描述
我正在尝试将 Kafka 与 Heron 拓扑集成。但是,我找不到最新版本的 Heron (0.17.5) 的任何示例。是否有任何可以共享的示例或有关如何实现自定义 Kafka Spout 和 Kafka Bolt 的任何建议?
编辑1:
我相信KafkaSpout和KafkaBolt在 Heron 中被有意弃用,以便为新的Streamlet API 让路。我目前正在查看是否可以使用 Streamlet API构建KafkaSource和KafkaSink 。但是,当我尝试在Source中创建KafkaConsumer时,出现以下异常。
Caused by: java.io.NotSerializableException: org.apache.kafka.clients.consumer.KafkaConsumer
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at com.twitter.heron.api.utils.Utils.serialize(Utils.java:97)
编辑2:
修复了上述问题。我正在初始化KafkaConsumer
错误的构造函数。在方法中初始化相同的setup()
方法修复了它。
解决方案
我设法使用 Streamlet API for Heron 完成了这项工作。我在这里发布相同的内容。希望它可以帮助其他面临同样问题的人。
卡夫卡源
public class KafkaSource implements Source {
private String streamName;
private Consumer<String, String> kafkaConsumer;
private List<String> kafkaTopic;
private static final Logger LOGGER = Logger.getLogger("KafkaSource");
@Override
public void setup(Context context) {
this.streamName = context.getStreamName();
kafkaTopic = Arrays.asList(KafkaProperties.KAFKA_TOPIC);
Properties props = new Properties();
props.put("bootstrap.servers", KafkaProperties.BOOTSTRAP_SERVERS);
props.put("group.id", KafkaProperties.CONSUMER_GROUP_ID);
props.put("enable.auto.commit", KafkaProperties.ENABLE_AUTO_COMMIT);
props.put("auto.commit.interval.ms", KafkaProperties.AUTO_COMMIT_INTERVAL_MS);
props.put("session.timeout.ms", KafkaProperties.SESSION_TIMEOUT);
props.put("key.deserializer", KafkaProperties.KEY_DESERIALIZER);
props.put("value.deserializer", KafkaProperties.VALUE_DESERIALIZER);
props.put("auto.offset.reset", KafkaProperties.AUTO_OFFSET_RESET);
props.put("max.poll.records", KafkaProperties.MAX_POLL_RECORDS);
props.put("max.poll.interval.ms", KafkaProperties.MAX_POLL_INTERVAL_MS);
this.kafkaConsumer = new KafkaConsumer<>(props);
kafkaConsumer.subscribe(kafkaTopic);
}
@Override
public Collection get() {
List<String> kafkaRecords = new ArrayList<>();
ConsumerRecords<String, String> records = kafkaConsumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records) {
String rVal = record.value();
kafkaRecords.add(rVal);
}
return kafkaRecords;
}
@Override
public void cleanup() {
kafkaConsumer.wakeup();
}
}
推荐阅读
- php - 在 Woocommerce 产品中动态填充自定义表单
- hyperledger-fabric - 如何注册新用户并防止管理员知道用户的密钥 [Hyperledger Fabric]
- python - python如何通过右键单击Windows访问文件
- mysql - MySQL 多个 JOINS 在同一个单元格上
- css - 为什么使用 Bootstrap 4 在顶行和导航栏之间有一个空格?
- javascript - 如何将 React Hooks 与 video.js 一起使用?
- objective-c - Clang 将 #import 语句误解为文件,而不是框架
- python - 如何在文本字段中创建有效的 HTML 标签
- javafx - JavaFX中时间轴的场景更改问题
- javascript - 为什么我的 JavaScript 代码算术方程出现 NaN 错误