cassandra - 有什么方法可以找出 Cassandra 中的 SELECT 语句使用了哪个节点?
问题描述
我已经编写了一个自定义的 LoadBalancerPolicy spark-cassandra-connector
,现在我想确保它真的有效!
我有一个 Cassandra 集群,它有 3 个节点和一个复制因子为 2 的键空间,所以当我们要检索记录时,cassandra 上只有两个节点来保存数据。
问题是我想确保spark-cassandra-connector
(使用我的负载平衡器策略)仍然是令牌感知的,并将选择正确的节点作为每个“SELECT”语句的协调器。
现在,我在想我们是否可以在每个节点的 SELECT 语句上写一个触发器,以防节点不保存数据,触发器将创建一个日志,我意识到负载均衡器策略无法正常工作. 我们如何在 Cassandra 中编写 On SELECT 触发器?有没有更好的方法来做到这一点?
我已经检查了创建触发器的文档,但这些文档太有限了:
解决方案
根据亚历克斯所说,我们可以这样做:
创建 SparkSession 后,我们应该创建一个连接器:
import com.datastax.spark.connector.cql.CassandraConnector
val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)
现在我们可以定义一个preparedStatement并完成剩下的工作:
connector.withSessionDo(session => {
val selectQuery = "select * from test where id=?"
val prepareStatement = session.prepare(selectQuery)
val protocolVersion = session.getCluster.getConfiguration.getProtocolOptions.getProtocolVersion
// We have to explicitly bind the all of parameters that partition key is based on them, otherwise the routingKey will be null.
val boundStatement = prepareStatement.bind(s"$id")
val routingKey = boundStatement.getRoutingKey(protocolVersion, null)
// We can get tha all of nodes that contains the row
val replicas = session.getCluster.getMetadata.getReplicas("test", routingKey)
val resultSet = session.execute(boundStatement)
// We can get the node which gave us the row
val host = resultSet.getExecutionInfo.getQueriedHost
// Final step is to check whether the replicas contains the host or not!!!
if (replicas.contains(host)) println("It works!")
})
重要的是我们必须显式绑定分区键所基于的所有参数(即我们不能在 SELECT 语句中将它们设置为硬编码),否则 routingKey 将为空。