apache-spark - Cassandra 加入后解析 Spark RDD
问题描述
我想阅读一个镶木地板文件并加入 Cassandra 中的一些列。我能够加入并获得一个 RDD,但我无法解析生成的 RDD。这里有更多细节
case class IP (key: String, key2: String,key3: String,key4: String,key5: String,key6: String,key7: String,key8: String,key9: String,key10: String,key11: String,key12: String,key13: String,key14: String,key15: String,column1:String,column2:String,column3:String,column4:String,column5:String,value1:String)
val a = cs_cube2_2_6.rdd.map(p => IP(p(0).toString, p(1).toString, p(2).toString, p(3).toString, p(4).toString, p(5).toString, p(6).toString, p(7).toString, p(8).toString, p(9).toString, p(10).toString, p(11).toString, p(12).toString, p(13).toString, p(14).toString, p(15).toString, p(16).toString, p(17).toString, p(18).toString, p(19).toString, p(20).toString))
val joinWithRDD = a.joinWithCassandraTable("key","tbl").on(SomeColumns("key","key2","key3","key4","key5","key6","key7","key8","key9","key10","key11","key12","key13","key14","key15")).select("value1")
scala> joinWithRDD: com.datastax.spark.connector.rdd.CassandraJoinRDD[IP,com.datastax.spark.connector.CassandraRow] = CassandraJoinRDD[15] at RDD at CassandraRDD.scala:19
生成的 RDD 模式如上所示。RDD 的输出是这样的。
(IP(2_2_6,AA,FF,14-12-07 23,false,true,-,-,-,-,-,-,-,-,-,-,-,-,-,3580),CassandraRow{value1: 3580})
(IP(2_2_6,BB,GG,143,2019-12-07 00,false,true,-,-,-,-,-,-,-,-,-,-,-,-,-,154),CassandraRow{value1: 154})
我不确定如何解析这个 RDD。我想总结IP的最后一列和 Cassandra 行的value1列。
让我知道是否需要任何进一步的细节。并感谢帮助
这是 Cassandra 表模式
CREATE TABLE aa.tbl (
key text,
key2 text,
key3 text,
key4 text,
key5 text,
key6 text,
key7 text,
key8 text,
key9 text,
key10 text,
key11 text,
key12 text,
key13 text,
key14 text,
key15 text,
column1 text,
column2 text,
column3 text,
column4 text,
column5 text,
value1 text,
value2 blob,
value3 text,
PRIMARY KEY ((key, key2, key3, key4, key5, key6, key7, key8, key9, key10, key11, key12, key13, key14, key15), column1, column2, column3, column4, column5)
) WITH CLUSTERING ORDER BY (column1 ASC, column2 ASC, column3 ASC, column4 ASC, column5 ASC)
AND bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'}
AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99p';
解决方案
你需要做这样的事情(没有检查代码,但它应该进行小的调整 - 我假设value1
在 Cassandra 中具有整数类型):
joinWithRDD.map { case (ip, row) =>
val newVal = ip.value1.toInteger + row.getInt("value1")
IP(ip.key, key2, .... newVal.toString)
}
joinWithCassandraTable
返回数据的元组为,_1
在 Cassandra 中找到的数据为_2
。访问 Cassandra 数据时,您可以使用 getter 函数getInt
、getString
等,或者您可以映射Row
到案例类,如Spark Cassandra 连接器的文档中所述。
推荐阅读
- sql - MS-Access VBA DoCmd.RunSQL 问题
- c# - 如何在不扩展 MVVM 的情况下向模型类添加新属性
- python - 在 python 中自动生成应用函数
- windows - Windows Docker 桌面 Linux 模式 - docker 容器时间偏差
- c# - 如何在 System.Text.Json.Serialization 上强制使用 Newtonsoft.Json
- python - 询问地点趋势时的 Twitter API 错误
- typescript - 我在哪里可以看到 deno 下载的包?
- google-api - Google Drive API v3 出现 403 错误,请求未在 Google API Metrics 中更新
- aframe - aframe - 如何摆脱 VR 按钮
- python - 为什么 replace() 函数在未使用正确参数请求时返回经过语法编辑的结果?