apache-spark - 使用 spark-streaming 将数据发布到 kafka 主题时重复
问题描述
我有 spark-streaming 应用程序,它使用来自 topic1 的数据并解析它,然后将相同的记录发布到 2 个进程中,一个是 topic2,另一个是 hive 表。在将数据发布到 kafka topic2 时,我看到重复项,但在配置单元表中看不到重复项
使用火花 2.2,卡夫卡 0.10.0
KafkaWriter.write(spark, storeSalesStreamingFinalDF, config)
writeToHIVE(spark, storeSalesStreamingFinalDF, config)
object KafkaWriter {
def write(spark: SparkSession, df: DataFrame, config: Config)
{
df.select(to_json(struct("*")) as 'value)
.write
.format("kafka")
.option("kafka.bootstrap.servers", config.getString("kafka.dev.bootstrap.servers"))
.option("topic",config.getString("kafka.topic"))
.option("kafka.compression.type",config.getString("kafka.compression.type"))
.option("kafka.session.timeout.ms",config.getString("kafka.session.timeout.ms"))
.option("kafka.request.timeout.ms",config.getString("kafka.request.timeout.ms"))
.save()
}
}
有人可以帮忙吗,
预计 kafka topic2 中没有重复项。
解决方案
要处理重复数据,我们应该设置.option("kafka.processing.guarantee","exactly_once"
)
推荐阅读
- sorting - 如何使用 Java 8 流按原始 int 成员对对象的 ArrayList 进行排序?
- r - 将两个值列表组合到一个数据框中:错误参数意味着不同的行数
- javascript - 在 Node.js 中使用 Express 渲染数组中的多个对象中的每一个
- python - 如何在networkx间接图中绘制循环边缘
- c# - xamarin:从数据库中获取多个纬度和经度并使用 pin 将其绘制到地图中?
- android - Flutter Execute Method 长按按钮
- r - R:使用图表包绘制马尔可夫模型(进行图表更改)
- java - 识别对象
- android - 即使项目被反编译,在android中加密字符串的安全方法?
- windows - PrintPreviewControl 和表单设计在另一个操作系统上的工作方式不同。VB.NET