scala - 使用 spark 中的数据帧以 writetime 写入 Cassandra
问题描述
我有以下代码:-
val kafkaStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER)
val collection = kafkaStream.map(_._2).map(parser)
collection.foreachRDD(rdd =>
{
if (!rdd.partitions.isEmpty) {
try {
val dfs = rdd.toDF()
dfs.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "tablename", "keyspace" -> "dbname"))
.mode(SaveMode.Append).save()
} catch {
case e: Exception => e.printStackTrace
}
} else {
println("blank rdd")
}
})
在上面的示例中,我使用数据帧将火花流保存到 cassandra。现在,我希望 df 的每一行都有其特定的写入时间,类似于此命令 -
insert into table (imei , date , gpsdt ) VALUES ( '1345','2010-10-12','2010-10-12 10:10:10') USING TIMESTAMP 1530313803922977;
所以基本上每一行的写入时间应该等于该行的gpsdt列。在搜索时,我找到了这个链接,但它显示了 RDD 的示例,我想要类似的数据框用例 - https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md任何建议,谢谢
解决方案
据我所知,DataFrame 版本中没有这样的功能(有相应的 JIRA:https ://datastax-oss.atlassian.net/browse/SPARKC-416 )。但是您无论如何都有 RDD,您可以将其转换为 DataFrame - 为什么不saveToCassandra
按照您引用的链接中的描述使用?
PS当你检查空虚时你可能会遇到性能问题(http://www.waitingforcode.com/apache-spark/isEmpty-trap-spark/read)
推荐阅读
- javascript - 使用 jquery 或纯 javascript 重新排列现有表
- nservicebus - 是否可以在不使用调用 MarkAsComplete 的长时间超时的情况下为 Saga 设置 TTL?
- html - 如何使用 Bootstrap 缩放(框)屏幕大小
- python - 绘制分段线时如何正确使用自定义颜色图?
- python - Travic CI 失败:名称解析暂时失败
- java - 检查用户名是否已在 ArrayList 中注册
- mysql - 我想使用 SQL Query 在 Toad 中获取日期时间戳
- php - vladimir-yuldashev/laravel-queue-rabbitmq 5.4 需要照明/支持 5.4.*
- python - 警告:张量流:模型是用输入张量()的形状构造的。但它是在形状不兼容的输入上调用的
- javascript - 如何将 1d 转换为 2d 并将其存储在反应原生状态?