首页 > 解决方案 > 有什么方法可以找出 Cassandra 中的 SELECT 语句使用了哪个节点?

问题描述

我已经编写了一个自定义的 LoadBalancerPolicy spark-cassandra-connector,现在我想确保它真的有效!

我有一个 Cassandra 集群,它有 3 个节点和一个复制因子为 2 的键空间,所以当我们要检索记录时,cassandra 上只有两个节点来保存数据。

问题是我想确保spark-cassandra-connector(使用我的负载平衡器策略)仍然是令牌感知的,并将选择正确的节点作为每个“SELECT”语句的协调器。

现在,我在想我们是否可以在每个节点的 SELECT 语句上写一个触发器,以防节点不保存数据,触发器将创建一个日志,我意识到负载均衡器策略无法正常工作. 我们如何在 Cassandra 中编写 On SELECT 触发器?有没有更好的方法来做到这一点?

我已经检查了创建触发器的文档,但这些文档太有限了:

官方文档

DataStax 的文档

官方仓库中的示例实现

标签: cassandradatabase-triggerspark-cassandra-connector

解决方案


根据亚历克斯所说,我们可以这样做:

创建 SparkSession 后,我们应该创建一个连接器:

import com.datastax.spark.connector.cql.CassandraConnector
val connector = CassandraConnector.apply(sparkSession.sparkContext.getConf)

现在我们可以定义一个preparedStatement并完成剩下的工作:

connector.withSessionDo(session => {

    val selectQuery = "select * from test where id=?"
    val prepareStatement = session.prepare(selectQuery)
    val protocolVersion = session.getCluster.getConfiguration.getProtocolOptions.getProtocolVersion
    // We have to explicitly bind the all of parameters that partition key is based on them, otherwise the routingKey will be null.
    val boundStatement = prepareStatement.bind(s"$id")
    val routingKey = boundStatement.getRoutingKey(protocolVersion, null)
    // We can get tha all of nodes that contains the row
    val replicas = session.getCluster.getMetadata.getReplicas("test", routingKey)
    val resultSet = session.execute(boundStatement)

    // We can get the node which gave us the row
    val host = resultSet.getExecutionInfo.getQueriedHost

    // Final step is to check whether the replicas contains the host or not!!!
    if (replicas.contains(host)) println("It works!")
  })

重要的是我们必须显式绑定分区键所基于的所有参数(即我们不能在 SELECT 语句中将它们设置为硬编码),否则 routingKey 将为空。


推荐阅读