mysql - Scala 读取 Kafka 主题并写入 MySQL 表
问题描述
我写了一个 Scala 程序,创建了一个 Kafka 主题,在控制台中显示数据。现在,我正在尝试修改现有代码,以便它可以下沉到 MySQ 表。
db name: books
table name: authors
你能帮我修改下面的代码,以便可以将来自 Kafka topic_json 的数据发送到 MySQL 表吗?
df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.format("jdbc")
.outputMode("overwrite")
.option("url", "jdbc:mysql://127.0.0.1:3306/books")
.option("books", "authors")
.start()
.awaitTermination()
ReadStream 代码供参考:
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "json_topic")
.option("startingOffsets", "earliest") // From starting
.load()
感谢您的所有帮助。
解决方案
尝试使用“for each batch”怎么样,写的基本逻辑是写一个普通的DF到MySQL。请参阅下面的管道部分。 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch
如果您难以理解 的逻辑writing a normal data frame to MySQL
,我可以稍后将我的工作示例代码粘贴writes the stream to postgresql using for each batch
到此处。
写到 postgresql 的代码,匆忙的 POC,在 Java 上。注意方法writeToPostgresql
。希望你能得到这个想法并让 scala 版本工作。如果您需要有关 scala 版本的帮助,请告诉我。
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import java.util.HashMap;
import java.util.Map;
//todo: write a logging class and extends from it.
public class PostgresqlService {
// Transform and write batchDF.
static <T> void postgresSink(Dataset<T> dataset, Long batchId, String tableName) {
dataset.persist();
dataset.write().format("jdbc").option("dbtable", tableName).options(postgresqlOptions()).mode(SaveMode.Append).save();
dataset.write().format("console").mode(SaveMode.Overwrite).save(); //note: on prod do not do this type of stuff.
dataset.unpersist();
}
//TODO(optional): pass in as an option for things like checkpoint.
//Method to write the dataset into postgresql
public static <T> void writeToPostgresql(Dataset<T> dataset, OutputMode outputMode, String tableName) {
try {
dataset
.writeStream()
.option("checkpointLocation", "/tmp/spark-checkpoint/"+tableName) //path/to/HDFS/dir
.outputMode(outputMode)
.foreachBatch(
new VoidFunction2<Dataset<T>, Long>() {
public void call(Dataset<T> dataset, Long batchId) {
postgresSink(dataset, batchId, tableName);
}
}
)
// .trigger(Trigger.Once())
.start()
.awaitTermination();
} catch (Exception e) {
System.out.println(e.toString());
System.exit(1);
}
}
/**
* Spark-PostgreSQL connection properties.
*
* @return Map of String -> String
*/
static Map<String, String> postgresqlOptions() {
//TODO (optional): current is POC level. if i have time: read from config
Map<String, String> map = new HashMap<String, String>() {
{
put("user", "sparkstreaming"); // Database username
put("password", "password"); // Password
put("driver", "org.postgresql.Driver");
put("url", "jdbc:postgresql://localhost:5432/sparkstreaming");
}
};
return map;
}
}
`
when i called above method, i used `OutputMode.Update()`, like
` writeToPostgresql(transformedAggregatedPayload, OutputMode.Update(), "my-table-name");`
推荐阅读
- react-native - React Native DateTimePicker的日期格式?
- android - 在 Android Studio 上运行应用程序,模拟器打开,但不显示应用程序
- javascript - 如何在 Angular JS 中访问控制器函数中的内部脚本 $scope 值
- python - 如何在python中重新排列列表
- ios - 无法在 iOS 上运行 react-native-unimodules
- sparql - SPARQLWrapper 不能使 CONSTRUCT 查询返回 XML 以外的其他查询
- jquery - 单击子菜单时,我想将类添加到父菜单
- excel - VBA在运行脚本时不刷新数据,只有在
- python-3.x - 如何从 Python pod 中访问 kube-apiserver?
- c# - 将查询字符串传递给 HttpRequestMessage