apache-spark - 使用 Spark 3.0 读取 Cassandra TTL 和 WRITETIME 时出错
问题描述
尽管来自 DataStax 的最新 spark-cassandra-connector声明它支持读/写 TTL 和 WRITETIME,但我仍然收到 SQL 未定义函数错误。
在 9.1 LTS ML(包括 Apache Spark 3.1.2、Scala 2.12)集群上使用 Databricks 和库 com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.1.0 和 CassandraSparkExtensions 的 Spark 配置。CQL 版本 3.4.5。
spark.sql.extensions com.datastax.spark.connector.CassandraSparkExtensions
使用笔记本代码确认配置:
spark.conf.get("spark.sql.extensions")
出 [7]: 'com.datastax.spark.connector.CassandraSparkExtensions'
# Cassandra connection configs using Data Source API V2
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.host", "10.1.4.4")
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.port", "9042")
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.auth.username", dbutils.secrets.get(scope = "myScope", key = "CassUsername"))
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.auth.password", dbutils.secrets.get(scope = "myScope", key = "CassPassword"))
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.ssl.enabled", True)
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.ssl.trustStore.path", "/dbfs/user/client-truststore.jks")
spark.conf.set("spark.sql.catalog.cassandrauat.spark.cassandra.connection.ssl.trustStore.password", dbutils.secrets.get("key-vault-secrets", "cassTrustPassword"))
spark.conf.set("spark.sql.catalog.cassandrauat.spark.dse.continuous_paging_enabled", False)
# catalog name will be "cassandrauat" for Cassandra
spark.conf.set("spark.sql.catalog.cassandrauat", "com.datastax.spark.connector.datasource.CassandraCatalog")
spark.conf.set("spark.sql.catalog.cassandrauat.prop", "key")
spark.conf.set("spark.sql.defaultCatalog", "cassandrauat") # will override Spark to use Cassandra for all databases
%sql
select id, did, ts, val, ttl(val)
from cassandrauat.myKeyspace.myTable
SQL 语句中的错误:AnalysisException:未定义函数:'ttl'。该函数既不是注册的临时函数,也不是在数据库“默认”中注册的永久函数。第 1 行 25 号线
当直接在 Cassandra 集群上运行相同的 CQL 查询时,它会产生一个结果。
任何有关为什么 CassandraSparkExtensions 未加载的帮助表示赞赏。
为预加载库后发生的 NoSuchMethodError 添加完整堆栈跟踪
com.databricks.backend.common.rpc.DatabricksExceptions$SQLExecutionException: java.lang.NoSuchMethodError: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(Lscala/PartialFunction;)Lorg/apache/spark/sql/catalyst/plans/logical/LogicalPlan;
at org.apache.spark.sql.cassandra.CassandraMetaDataRule$.replaceMetadata(CassandraMetadataFunctions.scala:152)
at org.apache.spark.sql.cassandra.CassandraMetaDataRule$$anonfun$apply$1.$anonfun$applyOrElse$2(CassandraMetadataFunctions.scala:187)
at scala.collection.immutable.Stream.foldLeft(Stream.scala:549)
at org.apache.spark.sql.cassandra.CassandraMetaDataRule$$anonfun$apply$1.applyOrElse(CassandraMetadataFunctions.scala:186)
at org.apache.spark.sql.cassandra.CassandraMetaDataRule$$anonfun$apply$1.applyOrElse(CassandraMetadataFunctions.scala:183)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:484)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:86)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:484)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:262)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:258)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:460)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:428)
at org.apache.spark.sql.cassandra.CassandraMetaDataRule$.apply(CassandraMetadataFunctions.scala:183)
at org.apache.spark.sql.cassandra.CassandraMetaDataRule$.apply(CassandraMetadataFunctions.scala:90)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$3(RuleExecutor.scala:221)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:221)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:89)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:218)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:210)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:210)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:271)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:264)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:191)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:188)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:109)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:188)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:246)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:347)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:245)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:96)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:134)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:180)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:180)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:97)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:86)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:101)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:689)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:684)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:694)
at com.databricks.backend.daemon.driver.SQLDriverLocal.$anonfun$executeSql$1(SQLDriverLocal.scala:91)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at com.databricks.backend.daemon.driver.SQLDriverLocal.executeSql(SQLDriverLocal.scala:37)
at com.databricks.backend.daemon.driver.SQLDriverLocal.repl(SQLDriverLocal.scala:144)
at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$13(DriverLocal.scala:541)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:266)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:50)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:305)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:297)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:50)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:518)
at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:689)
at scala.util.Try$.apply(Try.scala:213)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:681)
at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:522)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:634)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:427)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:370)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:221)
at java.lang.Thread.run(Thread.java:748)
at com.databricks.backend.daemon.driver.SQLDriverLocal.executeSql(SQLDriverLocal.scala:129)
at com.databricks.backend.daemon.driver.SQLDriverLocal.repl(SQLDriverLocal.scala:144)
at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$13(DriverLocal.scala:541)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:266)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:50)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:305)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:297)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:50)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:518)
at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:689)
at scala.util.Try$.apply(Try.scala:213)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:681)
at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:522)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:634)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:427)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:370)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:221)
at java.lang.Thread.run(Thread.java:748)
解决方案
如果您刚刚通过集群 UI 添加了 Spark Cassandra 连接器,那么它将不起作用 - 原因是在 Spark 已经启动之后将库安装到集群中,因此spark.sql.extensions
找不到指定的类。
要解决此问题,您需要在 Spark 启动之前将 Jar 文件放入集群节点 - 您可以使用集群初始化脚本执行此操作,该脚本将直接使用类似这样的内容下载 jar(但它会下载多个副本 - 对于每个节点):
#!/bin/bash
wget -q -O /databricks/jars/spark-cassandra-connector-assembly_2.12-3.1.0.jar \
https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector-assembly_2.12/3.1.0/spark-cassandra-connector-assembly_2.12-3.1.0.jar
或者最好下载汇编 jar,放到 DBFS 上,然后从 DBFS 复制到目标目录(例如,如果它上传到/FileStore/jars/spark-cassandra-connector-assembly_2.12-3.1.0.jar
):
#!/bin/bash
cp /dbfs/FileStore/jars/spark-cassandra-connector-assembly_2.12-3.1.0.jar \
/databricks/jars/
更新 (13.11.2021):SCC 3.1.0 与 Spark 3.2.0 不完全兼容(部分内容已在 DBR 9.1 中)。有关详细信息,请参阅SPARKC-670。
推荐阅读
- netsuite - NetSuite SS2.0 - 创建一个选中“addtimestamptourl”的文件
- java - 按升序对包含多个元素的数组进行排序
- c++ - log2 在位操作中的作用是什么?
- mysql - ubuntu子系统windows 10更改datadir后无法运行sql
- python-3.x - 从python中的.wav声音文件中获取numpy数组的方法
- c - 为自定义字符串格式化函数实现可变参数检查
- javascript - 使用 VS CODE 进行调试
- javascript - 如何解决错误组件不是任何 NgModule 的一部分或未在 Angular 6 中正确导入?
- javascript - 异步调用节点 js 回调
- architecture - 向量中断会发生什么?