apache-spark - Spark 结构化流式处理 Kafka 偏移管理
问题描述
我正在研究将 kafka 偏移量存储在 kafka 内部以用于 Spark Structured Streaming,就像它适用于 DStreamsstream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
一样,我正在寻找相同的东西,但适用于 Structured Streaming。是否支持结构化流式传输?如果是,我该如何实现?
我知道使用 的 hdfs 检查点.option("checkpointLocation", checkpointLocation)
,但我对内置偏移管理完全感兴趣。
我期望 kafka 仅在没有 spark hdfs 检查点的情况下将偏移量存储在内部。
解决方案
我正在使用在某处找到的这段代码。
public class OffsetManager {
private String storagePrefix;
public OffsetManager(String storagePrefix) {
this.storagePrefix = storagePrefix;
}
/**
* Overwrite the offset for the topic in an external storage.
*
* @param topic - Topic name.
* @param partition - Partition of the topic.
* @param offset - offset to be stored.
*/
void saveOffsetInExternalStore(String topic, int partition, long offset) {
try {
FileWriter writer = new FileWriter(storageName(topic, partition), false);
BufferedWriter bufferedWriter = new BufferedWriter(writer);
bufferedWriter.write(offset + "");
bufferedWriter.flush();
bufferedWriter.close();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
/**
* @return he last offset + 1 for the provided topic and partition.
*/
long readOffsetFromExternalStore(String topic, int partition) {
try {
Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));
return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;
} catch (Exception e) {
e.printStackTrace();
}
return 0;
}
private String storageName(String topic, int partition) {
return "Offsets\\" + storagePrefix + "-" + topic + "-" + partition;
}
}
SaveOffset...在记录处理成功后调用,否则不存储偏移量。并且我使用 Kafka 主题作为源,因此我将起始偏移量指定为从 ReadOffsets 检索到的偏移量......
推荐阅读
- java - slack-api:如何格式化包含 url 的文本?
- r - r ggplot geom_point 改变颜色
- reactjs - React 无法从子组件中的 redux 获取状态
- php - 如何使用 curl 发布请求在 php 中运行 graphql api
- javascript - 精细上传器 s3 中 OPTIONS 的自定义标头
- android - 如何在android中的按钮内放置一个可点击的图标
- ios - UITableViewCell 对齐列以适应内容
- javascript - heroku 上的 node.js 应用程序出现问题
- java - 带有一个 return 语句的 Java 递归
- python - 如何迭代地读取网站和保存 har 数据(我有 1000 个 URL 的 CSV 文件 - 但无法保存具有唯一名称的文件