apache-spark - 我们可以将 Spark-SQL-Kafka 偏移量存储在 MySQL 表中而不是 HDFS 或 S3 中吗
问题描述
我有一个简单的 Spark-SQL-Kafka 程序,它从 Kafka 读取数据并写入 HDFS。
对于检查点,我过去使用过 HDFS 和 S3,它工作正常。
有没有办法可以使用 MySQL 进行检查点?
.option("checkpointLocation", ConfigLoader.getValue("checkpointPath"))
我们如何配置到 MySQL 表?
DataStreamReader kafkaDataStreamReader = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", ConfigLoader.getValue("brokers"))
.option("group.id", ConfigLoader.getValue("groupId"))
.option("subscribe", ConfigLoader.getValue("topics"))
.option("failOnDataLoss", false);
Dataset<Row> rawDataSet = kafkaDataStreamReader.load();
rawDataSet.createOrReplaceTempView("rawEventView");
sqlCtx.sql("select * from rawEventView")
.writeStream()
.partitionBy(JavaConversions.asScalaBuffer(Arrays.asList(("date_year,date_month,date_day,date_hour,date_minute").split(","))))
.format("csv")
.option("header", "true")
.option("compression","gzip")
.option("delimiter", "~")
.option("checkpointLocation", ConfigLoader.getValue("checkpointPath"))
.option("path", ConfigLoader.getValue("recordsPath"))
.outputMode(OutputMode.Append())
.trigger(ProcessingTime.create(Integer.parseInt(ConfigLoader.getValue("kafkaProcessingTimeInSeconds")), TimeUnit.SECONDS))
.start()
.awaitTermination();
解决方案
推荐阅读
- ios - 如何解决Alamofire显示数据
- amazon-web-services - AWS CloudFormation - 在 RDS 实例就绪后创建表?
- javascript - 如何在javascript对象中查找动态键的值
- javascript - d3饼图全黑填充,d3.schemeCategory20c未被调用
- php - 在 htacess 文件上重写 url 不起作用
- typescript - 从构造函数调用的方法中为“只读”属性赋值
- three.js - 三.js对象动态变化带LOD动画
- python - 获取网站中特定 url 属性的最大限制是多少
- redux-form - 制作反应选择 2.0.0
使用 redux-form - python - Tkinter 如何吹捧比较画布的标签