mongodb - 无法使用 Spark 结构化流向 MongoDB 发送数据
问题描述
我按照这个Unable to send data to MongoDB using Kafka-Spark Structured Streaming将数据从 Spark Structured Streaming 发送到 mongoDB 并且我成功实现了它,但是有一个问题。就像当函数
override def process(record: Row): Unit = {
val doc: Document = Document(record.prettyJson.trim)
// lazy opening of MongoDB connection
ensureMongoDBConnection()
val result = collection.insertOne(doc)
if (messageCountAccum != null)
messageCountAccum.add(1)
}
代码执行没有任何问题,但没有数据发送到 MongoDB
但是如果我添加这样的打印语句
override def process(record: Row): Unit = {
val doc: Document = Document(record.prettyJson.trim)
// lazy opening of MongoDB connection
ensureMongoDBConnection()
val result = collection.insertOne(doc)
result.foreach(println) //print statement
if (messageCountAccum != null)
messageCountAccum.add(1)
}
数据被插入 MongoDB
不知道为什么????
解决方案
foreach
初始化写入器接收器。如果没有 foreach,您的数据框永远不会被计算。
Try this :
val df = // your df here
df.map(r => process(r))
df.count()
推荐阅读
- c# - 在 Windows 窗体应用程序中刷新画布的问题
- angularjs - Express - 为具有多个路由器文件的 Angular 1.x html5Mode 启用全部捕获功能不起作用
- c# - 带有代理丢失消息的 ZeroMQ 广播
- google-analytics - 使用 gtag.js 时,如何编辑 URL 中的个人身份信息 (PII)?
- r - R dplyr 查找所有变异的行
- python - 如何“窥视”读取协程是否会立即完成?
- database - 无法在 Netbeans IDE 8.2 中创建 javaDB
- javascript - 显示 div 按钮后进入新页面
- java - Java中的非阻塞异步IO
- python - 如何将这个时间转换为 utc 以供 google api 使用?