apache-spark - Spark UDF 使用 Cassandra 连接器查找密钥
问题描述
我正在尝试在我们的一些 spark 作业中使用 cassandra 作为键值查找存储。
我们主要使用 Dataframes,并且已经远离了 RDD API。
我认为我可以编写一个连接到 cassandra 的 Spark UDF 查找一个键,而不是加入表,将它们加载到 spark 中或
将连接推送到 cassandra 并采取措施避免大表扫描我还想将结果行转换为案例类对象并返回该对象。
我根据下面这个问题的回答得到了一些信息。withSessionDo重用每个节点上可用的底层 JVM 级别会话 Spark Cassandra 连接器正确使用
val connector = CassandraConnector(sparkConf) // I Know this is serializable.
def lookupKey(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
connector.withSessionDo(session => {
val stmt = session.prepare(s"SELECT * FROM $keyspace.$table WHERE key = ?")
val result = session.execute( stmt.bind(key) )
MyCaseClass(
fieldl1 = result.getString(0),
fieldl2 = result.getInt(1)
...
)
}
})
Session 不可序列化,因此我们无法在 udf 之外创建一个并将其传入,因此我们可以使用映射管理器将行转换为案例类实例。使用映射管理器的替代方法,
def lookupKeyAlt(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
connector.withSessionDo(session => {
val manager = new MappingManager(session) // session isn't serializable, so creating one outside and passing to udf is not an option if wf we were willing to do the session management.
val mapperClass = manager.mapper(classOf[MyCaseClass], keyspace)
mapperClass.get(key)
}
})
我是 cassandra 的新手,所以请多多指教。
- 这些方法中是否有我不知道的陷阱?
- 在第二种方法中,我知道我们在每次调用 UDF 时都会创建一个新的 MappingManager(session)。这是否仍会使用 jvm 级会话并打开更多会话?每次调用都实例化 MappingManager 是否正确?该会话不可序列化,因此我无法在外部创建它并将其传递给 UDF。
- 将结果 Row 转换为 Case Class 的对象还有哪些其他方法?
- 有没有更好的选择来做这种查找?
解决方案
您正在尝试模拟 Spark Cassandra 连接器 (SCC) 在幕后所做的事情,但是您的实现会比 SCC 慢得多,因为您使用的是同步 API,并且一个接一个地获取所有数据,而 SCC 使用的是异步 API ,并并行拉取多行数据。
实现您想要的最佳方式是使用 Cassandra 优化连接(通常称为“直接连接”)。这种 join 一直可用于 RDD API,但很长一段时间以来,Dataframe API 仅在连接器的商业版本中可用。但是从 SCC 2.5.0(2020 年 5 月发布)开始,这个功能也可以在开源版本中使用,因此您可以使用它而不是构建它的仿真。仅当您启用特殊的 Catalyst 扩展时,才会执行直接连接,方法是将spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions
配置 SparkSession 时(例如通过命令行)。之后,您可以通过完整或部分主键与 Cassandra 表执行 join,SCC 会自动将 join 转换为对 Cassandra 执行非常有效的单个请求。您可以通过在连接的数据帧上执行来检查是否发生这种情况explain
,因此您应该会看到类似这样的内容(查找字符串Cassandra Direct Join):
scala> joined.explain
== Physical Plan ==
Cassandra Direct Join [pk = id#30, c1 = cc1#32] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
+- *(1) Project [cast(id#28L as int) AS id#30, cast(id#28L as int) AS cc1#32]
+- *(1) Range (1, 5, step=1, splits=8)
我最近写了一篇长篇博文,解释了如何使用 Dataframe 和 RDD API 在 Cassandra 中执行有效的数据连接——我不想在这里重复 :-)
推荐阅读
- matlab - Matlab 没有读取 .mat 文件
- sql - 有什么方法可以针对只读 SQL 数据库创建视图?
- database - 表情符号存储为?在 PhpMyAdmin 中
- javascript - Javascript 强制将字符串插入 JS 对象 (JSON) 传单向量图块
- c# - 如何使用 C# 代码在三星手机中找到移动代理?
- spring-boot - Maven 无法下载传递性招摇依赖
- python - 如何将表直接导入数据块中的 Python 数据框中?
- javascript - 如何延迟数据库查询并忽略 JS 之间发生的请求?
- julia - @view 当等式两边都是 Julia 中的数组切片时
- java - windowTranslucentNavigation 和颜色 java android studio