首页 > 解决方案 > 将数据从 spark 插入 cassandra:如何验证一切正常

问题描述

我正在尝试使用 pyspark 将 csv 文件中的数据插入 Cassandra。

这是代码:

我读了数据:

    df =spark.read.format("csv") \
        .option("header","true") \
        .option("inferSchema","true") \
        .option("nullValue","NA") \
        .option("timestampFormat","ddMMMyyyy:HH:mm:ss") \
        .option("quote", "\"") \
        .option("delimiter", ";") \
        .option("mode","failfast") \
        .load("gs://tidy-centaur-b1/data/PRESCRIPTIONS_ANO.csv")

编辑:我把整个代码显示唯一键

    dfi = df.withColumn("id", F.monotonically_increasing_id()) \
        .withColumnRenamed("CHAIPRAT", "chaiprat") \
        .withColumnRenamed("PRE_PRE_DTD", "pre_pre_dtd") \
        .withColumnRenamed("NbMol", "nbmol") \
        .withColumnRenamed("NumAno", "numano")


    dfi.createOrReplaceTempView("prescription")

我计算行并将数据保存到 cassandra

    dfi.count()
    > 4169826

    dfi.write.format("org.apache.spark.sql.cassandra") \
        .mode("overwrite") \
        .option("confirm.truncate","true") \
        .option("spark.cassandra.connection.host","10.142.0.4") \
        .option("spark.cassandra.connection.port","9042") \
        .option("keyspace","uasb03") \
        .option("table","prescription") \
        .save()

现在我从 cassandra 读取数据并计算行数。

    presc = sql.read \
        .format("org.apache.spark.sql.cassandra") \
        .option("spark.cassandra.connection.host","10.142.0.4") \
        .option("spark.cassandra.connection.port","9042") \
        .load(table="prescription", keyspace="uasb03")

    presc.count()
    > 2148762

只有从第一次计数的一半。

我在日志文件中没有发现任何表明出现问题的内容。有人有线索吗 ?

编辑:我尝试更改 cassandra.yaml 中的所有超时值,但 presc.count 保持不变

编辑这里是 cassandra 表描述

    cqlsh:uasb03> desc prescription;

    CREATE TABLE uasb03.prescription (
        id int PRIMARY KEY,
        chaiprat int,
        nbmol int,
        numano int,
        pre_pre_dtd timestamp
    ) WITH bloom_filter_fp_chance = 0.01
        AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
        AND comment = ''
        AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
        AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
        AND crc_check_chance = 1.0
        AND dclocal_read_repair_chance = 0.1
        AND default_time_to_live = 0
        AND gc_grace_seconds = 864000
        AND max_index_interval = 2048
        AND memtable_flush_period_in_ms = 0
        AND min_index_interval = 128
        AND read_repair_chance = 0.0
        AND speculative_retry = '99PERCENTILE';

为了执行我的验证,我还在一个 csv 文件中写入了输出,我得到了

    chaiprat;pre_pre_dtd;nbmol;numano;id
    29100476;03Feb2017:00:00:00;5;378369;8589934592
    29100476;24Feb2017:00:00:00;1;378369;8589934593
    29100476;27Feb2017:00:00:00;2;378369;8589934594

id 大于 int。

标签: apache-sparkcassandrapysparkspark-cassandra-connector

解决方案


最可能的原因是您的数据没有可能成为分区键的真正唯一的行标识符,因此当您存储数据时,某些值会被覆盖。您可以通过在保存数据之前显式创建具有正确分区键和集群列的表来解决此问题。这可以通过createCassandraTable调用您的数据框来完成(请参阅文档),如下所示:

createCassandraTable(
  "uasb03", "prescription", 
  partitionKeyColumns = Some(Seq("columnA")), 
  clusteringKeyColumns = Some(Seq("columnB")))

推荐阅读