首页 > 解决方案 > 如何在 Spark 中处理 Cassandra“持续时间”数据类型?

问题描述

我在使用 datastacks/spark-cassandra-connector 将 Cassandra (Scylla) 数据加载到 Apache Spark 时遇到问题:

scala> val rdd = sc.cassandraTable[(String)](keyspace, table).select("url").limit(10).collect()
java.util.NoSuchElementException: key not found: duration
at scala.collection.MapLike$class.default(MapLike.scala:228)
at scala.collection.AbstractMap.default(Map.scala:59)
at scala.collection.MapLike$class.apply(MapLike.scala:141)
at scala.collection.AbstractMap.apply(Map.scala:59)
at com.datastax.spark.connector.types.ColumnType$$anonfun$1.applyOrElse(ColumnType.scala:117)

我用 Scala 和 Python 尝试了不同的 Spark 和 spark-cassandra-connector 版本,但没有成功。我猜是因为 datastacks/spark-cassandra-connector 中缺少持续时间数据类型支持,我检查了 ColumnType.scala,它们只有:

  private[connector] val primitiveTypeMap = Map[DataType, ColumnType[_]](
    DataType.text() -> TextType,
    DataType.ascii() -> AsciiType,
    DataType.varchar() -> VarCharType,
    DataType.cint() -> IntType,
    DataType.bigint() -> BigIntType,
    DataType.smallint() -> SmallIntType,
    DataType.tinyint() -> TinyIntType,
    DataType.cfloat() -> FloatType,
    DataType.cdouble() -> DoubleType,
    DataType.cboolean() -> BooleanType,
    DataType.varint() -> VarIntType,
    DataType.decimal() -> DecimalType,
    DataType.timestamp() -> TimestampType,
    DataType.inet() -> InetType,
    DataType.uuid() -> UUIDType,
    DataType.timeuuid() -> TimeUUIDType,
    DataType.blob() -> BlobType,
    DataType.counter() -> CounterType,
    DataType.date() -> DateType,
    DataType.time() -> TimeType
  )

在这种情况下,有人知道如何处理 Cassandra 的持续时间数据类型吗?

标签: scalaapache-sparkcassandra

解决方案


Spark 支持的类型定义不包含 Duration,但它们声明

您可以将符合 CQL 标准的字符串转换为数字、日期、地址或 UUID。

此外,Java 驱动程序已经有了Duration 类,它的构造函数将字符串值转换为duration. 考虑到这一点,您应该能够创建类似于此处示例的自定义 Spark 连接器。

  • 我不是 Spark 用户,建议仅基于文档。

推荐阅读