首页 > 解决方案 > Spark UDF 使用 Cassandra 连接器查找密钥

问题描述

我根据下面这个问题的回答得到了一些信息。withSessionDo重用每个节点上可用的底层 JVM 级别会话 Spark Cassandra 连接器正确使用

val connector = CassandraConnector(sparkConf) // I Know this is serializable.

def lookupKey(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
    connector.withSessionDo(session => {
        val stmt = session.prepare(s"SELECT * FROM $keyspace.$table WHERE key = ?")
        val result = session.execute( stmt.bind(key) )
        MyCaseClass(
           fieldl1 = result.getString(0),
           fieldl2 = result.getInt(1)
           ...
        )
    }
})

Session 不可序列化,因此我们无法在 udf 之外创建一个并将其传入,因此我们可以使用映射管理器将行转换为案例类实例。使用映射管理器的替代方法,

def lookupKeyAlt(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
    connector.withSessionDo(session => {
        val manager = new MappingManager(session)   // session isn't serializable, so creating one outside and passing to udf is not an option if wf we were willing to do the session management.
        val mapperClass = manager.mapper(classOf[MyCaseClass], keyspace)
        mapperClass.get(key)
    }
})

我是 cassandra 的新手,所以请多多指教。

  1. 这些方法中是否有我不知道的陷阱?
  2. 在第二种方法中,我知道我们在每次调用 UDF 时都会创建一个新的 MappingManager(session)。这是否仍会使用 jvm 级会话并打开更多会话?每次调用都实例化 MappingManager 是否正确?该会话不可序列化,因此我无法在外部创建它并将其传递给 UDF。
  3. 将结果 Row 转换为 Case Class 的对象还有哪些其他方法?
  4. 有没有更好的选择来做这种查找?

标签: apache-sparkcassandraspark-cassandra-connector

解决方案


您正在尝试模拟 Spark Cassandra 连接器 (SCC) 在幕后所做的事情,但是您的实现会比 SCC 慢得多,因为您使用的是同步 API,并且一个接一个地获取所有数据,而 SCC 使用的是异步 API ,并并行拉取多行数据。

实现您想要的最佳方式是使用 Cassandra 优化连接(通常称为“直接连接”)。这种 join 一直可用于 RDD API,但很长一段时间以来,Dataframe API 仅在连接器的商业版本中可用。但是从 SCC 2.5.0(2020 年 5 月发布)开始,这个功能也可以在开源版本中使用,因此您可以使用它而不是构建它的仿真。仅当您启用特殊的 Catalyst 扩展时,才会执行直接连接,方法是将spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensions配置 SparkSession 时(例如通过命令行)。之后,您可以通过完整或部分主键与 Cassandra 表执行 join,SCC 会自动将 join 转换为对 Cassandra 执行非常有效的单个请求。您可以通过在连接的数据帧上执行来检查是否发生这种情况explain,因此您应该会看到类似这样的内容(查找字符串Cassandra Direct Join):

scala> joined.explain
== Physical Plan ==
Cassandra Direct Join [pk = id#30, c1 = cc1#32] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
+- *(1) Project [cast(id#28L as int) AS id#30, cast(id#28L as int) AS cc1#32]
   +- *(1) Range (1, 5, step=1, splits=8)

我最近写了一篇长篇博文,解释了如何使用 Dataframe 和 RDD API 在 Cassandra 中执行有效的数据连接——我不想在这里重复 :-)


推荐阅读