首页 > 解决方案 > Scala 读取 Kafka 主题并写入 MySQL 表

问题描述

我写了一个 Scala 程序,创建了一个 Kafka 主题,在控制台中显示数据。现在,我正在尝试修改现有代码,以便它可以下沉到 MySQ 表。

db name: books

table name: authors

你能帮我修改下面的代码,以便可以将来自 Kafka topic_json 的数据发送到 MySQL 表吗?

df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
  .writeStream
  .format("jdbc")
  .outputMode("overwrite")
  .option("url", "jdbc:mysql://127.0.0.1:3306/books")
  .option("books", "authors")
  .start()
  .awaitTermination()

ReadStream 代码供参考:

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "json_topic")
  .option("startingOffsets", "earliest") // From starting
  .load()

感谢您的所有帮助。

标签: mysqlscalaapache-sparkapache-kafkaspark-structured-streaming

解决方案


尝试使用“for each batch”怎么样,写的基本逻辑是写一个普通的DF到MySQL。请参阅下面的管道部分。 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch

如果您难以理解 的逻辑writing a normal data frame to MySQL,我可以稍后将我的工作示例代码粘贴writes the stream to postgresql using for each batch到此处。

写到 postgresql 的代码,匆忙的 POC,在 Java 上。注意方法writeToPostgresql。希望你能得到这个想法并让 scala 版本工作。如果您需要有关 scala 版本的帮助,请告诉我。

import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;

import java.util.HashMap;
import java.util.Map;

//todo: write a logging class and extends from it.

public class PostgresqlService {

    // Transform and write batchDF.
    static <T> void postgresSink(Dataset<T> dataset, Long batchId, String tableName) {
        dataset.persist();
        dataset.write().format("jdbc").option("dbtable", tableName).options(postgresqlOptions()).mode(SaveMode.Append).save();
        dataset.write().format("console").mode(SaveMode.Overwrite).save();   //note: on prod do not do this type of stuff.
        dataset.unpersist();
    }

    //TODO(optional): pass in as an option for things like checkpoint.
    //Method to write the dataset into postgresql
    public static <T> void writeToPostgresql(Dataset<T> dataset, OutputMode outputMode, String tableName) {
        try {
            dataset
                    .writeStream()
                    .option("checkpointLocation", "/tmp/spark-checkpoint/"+tableName) //path/to/HDFS/dir
                    .outputMode(outputMode)
                    .foreachBatch(
                            new VoidFunction2<Dataset<T>, Long>() {
                                public void call(Dataset<T> dataset, Long batchId) {
                                    postgresSink(dataset, batchId, tableName);
                                }
                            }
                    )
//                    .trigger(Trigger.Once())
                    .start()
                    .awaitTermination();
        } catch (Exception e) {
            System.out.println(e.toString());
            System.exit(1);
        }
    }


    /**
     * Spark-PostgreSQL connection properties.
     *
     * @return Map of String -> String
     */
    static Map<String, String> postgresqlOptions() {
        //TODO (optional): current is POC level. if i have time: read from config
        Map<String, String> map = new HashMap<String, String>() {
            {
                put("user", "sparkstreaming"); // Database username
                put("password", "password"); // Password
                put("driver", "org.postgresql.Driver");
                put("url", "jdbc:postgresql://localhost:5432/sparkstreaming");
            }
        };
        return map;
    }
}
`

when i called above method, i used `OutputMode.Update()`, like
` writeToPostgresql(transformedAggregatedPayload, OutputMode.Update(), "my-table-name");`

推荐阅读