首页 > 解决方案 > 我们可以将 Spark-SQL-Kafka 偏移量存储在 MySQL 表中而不是 HDFS 或 S3 中吗

问题描述

我有一个简单的 Spark-SQL-Kafka 程序,它从 Kafka 读取数据并写入 HDFS。

对于检查点,我过去使用过 HDFS 和 S3,它工作正常。

有没有办法可以使用 MySQL 进行检查点?

.option("checkpointLocation", ConfigLoader.getValue("checkpointPath"))

我们如何配置到 MySQL 表?

DataStreamReader kafkaDataStreamReader = sparkSession.readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", ConfigLoader.getValue("brokers"))
        .option("group.id", ConfigLoader.getValue("groupId"))
        .option("subscribe", ConfigLoader.getValue("topics"))
        .option("failOnDataLoss", false);
Dataset<Row> rawDataSet = kafkaDataStreamReader.load();                         
rawDataSet.createOrReplaceTempView("rawEventView");

sqlCtx.sql("select * from rawEventView")
        .writeStream()
        .partitionBy(JavaConversions.asScalaBuffer(Arrays.asList(("date_year,date_month,date_day,date_hour,date_minute").split(","))))
        .format("csv")
        .option("header", "true")
        .option("compression","gzip")
        .option("delimiter", "~")
        .option("checkpointLocation", ConfigLoader.getValue("checkpointPath"))
        .option("path", ConfigLoader.getValue("recordsPath"))
        .outputMode(OutputMode.Append())
        .trigger(ProcessingTime.create(Integer.parseInt(ConfigLoader.getValue("kafkaProcessingTimeInSeconds")), TimeUnit.SECONDS))
        .start()
        .awaitTermination();

标签: apache-sparkapache-spark-sqlspark-structured-streaming

解决方案


推荐阅读