apache-spark - 将 kafka 主题中的数据读入 spark 数据框
问题描述
private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(sparkSqlMysql.class);
private static final SparkSession sparkSession = SparkSession.builder().master("local[*]").appName("Spark2JdbcDs")
.getOrCreate();
public static void main(String[] args) {
// JDBC connection properties
// Load MySQL query result as Dataset
Dataset<Row> df = sparkSession.readStream().format("kafka").option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "SqlMessages").load();
我想做一些事情,我可以从我的 kafka 主题中读取我的 spark SQL 中的数据,但不能这样做。
有人可以指导我可以将我的数据从 kafka 主题转换为 spark SQL 吗?
我可以做到这一点的东西
Dataset<Row> schoolData = sparkSession.sql("select * from Schools");
解决方案
今天也在做类似的事情。从一开始就消耗了整个主题,转换为 DataFrame 并保存为 Parquet 表。您可以从 Scala 改编我的代码,想法应该很清楚。
val topic = "topic_bla_bla"
val brokers = "some_kafka_broker:9092"
val kafkaDF = spark.read.format("kafka").option("kafkaConsumer.pollTimeoutMs", "20000").option("startingOffsets", "earliest").option("kafka.bootstrap.servers", brokers).option("subscribe", topic).load()
val jsonDF = kafkaDF.selectExpr("CAST(value AS STRING)")
val finalDF = spark.read.option("mode", "PERMISSIVE").json(jsonDF.as[String])
finalDF.registerTempTable("wow_table")
//OR
finalDF.write.format("parquet").saveAsTable("default.wow_table")
spark.sql("select * from wow_table")
推荐阅读
- r - 通过另一列中设置的参数计算两列数据的比率
- google-visualization - 根据mysql值改变谷歌饼图切片的颜色
- aframe - 更改 A 资产上的“src”不会更新材料
- r - 从 r shiny 的传单中控制 popupImage 的大小
- marklogic - 无法删除多个集合。
- google-chrome - 过滤 REST api 调用
- cmd - 从 Windows cmder 获取带有“调用”的 cmd 文件
- php - Preg Match Subpatterns - 如何匹配字符串后面的所有 TD
- r - R如何同时将我的结果导出到多个栅格图层中?
- javascript - Java Weblauncher - JNLP 问题