scala - 使用 Spark 和 Kafka 进行流式处理的空值问题
问题描述
我创建了 SparkConsumer,这样我就可以通过 Spark 结构化流向 Kafka 发送一个 csv 文件。我启动 sparkConsumer,然后他等待 Producer。我启动 Producer 并发送文件。问题是我成为数据框中的“空”值而不是内容。我的输出如下所示:
-------------------------------------------
Batch: 1
-------------------------------------------
+---------+---------+-----------+--------+-----------------------+
|InvoiceNo|StockCode|Description|Quantity|timestamp |
+---------+---------+-----------+--------+-----------------------+
|null |null |null |null |2019-01-08 15:46:29.156|
|null |null |null |null |2019-01-08 15:46:29.224|
|null |null |null |null |2019-01-08 15:46:29.224|
|null |null |null |null |2019-01-08 15:46:29.225|
|null |null |null |null |2019-01-08 15:46:29.225|
|null |null |null |null |2019-01-08 15:46:29.225|
|null |null |null |null |2019-01-08 15:46:29.225|
|null |null |null |null |2019-01-08 15:46:29.225|
|null |null |null |null |2019-01-08 15:46:29.225|
|null |null |null |null |2019-01-08 15:46:29.225|
|null |null |null |null |2019-01-08 15:46:29.225|
|null |null |null |null |2019-01-08 15:46:29.241|
|null |null |null |null |2019-01-08 15:46:29.241|
|null |null |null |null |2019-01-08 15:46:29.241|
|null |null |null |null |2019-01-08 15:46:29.241|
|null |null |null |null |2019-01-08 15:46:29.241|
|null |null |null |null |2019-01-08 15:46:29.241|
|null |null |null |null |2019-01-08 15:46:29.241|
|null |null |null |null |2019-01-08 15:46:29.241|
|null |null |null |null |2019-01-08 15:46:29.241|
+---------+---------+-----------+--------+-----------------------+
sparkConsumer 的代码是:
object sparkConsumer extends App {
val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.appName("Spark-Kafka-Integration")
.master("local")
.getOrCreate()
val schema = StructType(Array(
StructField("InvoiceNo", StringType, nullable = true),
StructField("StockCode", StringType, nullable = true),
StructField("Description", StringType, nullable = true),
StructField("Quantity", StringType, nullable = true)
))
import spark.implicits._
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.option("delimiter", ";")
.option("header","true")
.option("inferSchema","true")
.load()
val df1 = df.selectExpr("CAST(value as STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
.select(from_json($"value", schema).as("data"), $"timestamp")
.select("data.*", "timestamp")
df1.writeStream
.format("console")
.option("truncate","false")
.start()
.awaitTermination()
}
生产者.scala:
object Producer extends App {
import java.util.Properties
import org.apache.kafka.clients.producer._
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val TOPIC="test"
val fileName = "path/to/test.csv"
val lines = Source.fromFile(fileName).getLines()
for(i <- lines){
val record = new ProducerRecord(TOPIC, "key", s"$i")
producer.send(record)
}
val record = new ProducerRecord(TOPIC, "key", "the end "+new java.util.Date)
producer.send(record)
producer.close()
}
谁能帮我成为我文件的内容?
解决方案
我认为这个问题与序列化和反序列化有关。您value
写给该主题的 , 是 csv 格式,例如:
111,someCode,someDescription,11
您的Spark 消费者认为该消息是 json 格式(from_json
带有一些模式)。如果消息如下所示,则解析将起作用。
{
"InvoiceNo": "111",
"StockCode": "someCode",
"Description": "someDescription",
"Quantity": "11"
}
您必须更改序列化或反序列化以相互匹配。
以下选项之一应该有效
- 生产者必须以 json 格式将消息写入主题
- Spark 消费者应该使用来解析行
comma
来分割字段
推荐阅读
- typescript - io-ts - 验证数组包含具有特定属性和值的每个对象之一
- c - 基数排序浮点数
- python - 类型错误:insert_expenses() 缺少 1 个必需的位置参数:'amt'
- javascript - getElementById 和 getElementsByClassName 不起作用
- forms - SqPaymentForm 不安全
- sql - 使用 Blob 存储作为数据源对 SQL On-Demand 中的数据进行分区
- sql - Google Bigquery - 创建活动记录数的时间序列
- assembly - 分析 AVX2 中的比较结果
- sql-server - Docker (Linux) 和 Windows 主机中的 SQL Server 的 MSDTC 配置问题
- sql - 如何使用联接编写此代码,Oracle