首页 > 解决方案 > 无法使用 Spark 结构化流向 MongoDB 发送数据

问题描述

我按照这个Unable to send data to MongoDB using Kafka-Spark Structured Streaming将数据从 Spark Structured Streaming 发送到 mongoDB 并且我成功实现了它,但是有一个问题。就像当函数

override def process(record: Row): Unit = {

    val doc: Document = Document(record.prettyJson.trim)
    // lazy opening of MongoDB connection


    ensureMongoDBConnection()
    val result = collection.insertOne(doc)
    if (messageCountAccum != null)
      messageCountAccum.add(1)
  }

代码执行没有任何问题,但没有数据发送到 MongoDB

但是如果我添加这样的打印语句

override def process(record: Row): Unit = {
    val doc: Document = Document(record.prettyJson.trim)

    // lazy opening of MongoDB connection


    ensureMongoDBConnection()
    val result = collection.insertOne(doc)
    result.foreach(println) //print statement
    if (messageCountAccum != null)
      messageCountAccum.add(1)
  }

数据被插入 MongoDB

不知道为什么????

标签: mongodbscalaspark-streamingspark-structured-streaming

解决方案


foreach初始化写入器接收器。如果没有 foreach,您的数据框永远不会被计算。

Try this :

val df = // your df here
df.map(r => process(r))
df.count()

推荐阅读