apache-spark - 将数据从 spark 插入 cassandra:如何验证一切正常
问题描述
我正在尝试使用 pyspark 将 csv 文件中的数据插入 Cassandra。
这是代码:
我读了数据:
df =spark.read.format("csv") \
.option("header","true") \
.option("inferSchema","true") \
.option("nullValue","NA") \
.option("timestampFormat","ddMMMyyyy:HH:mm:ss") \
.option("quote", "\"") \
.option("delimiter", ";") \
.option("mode","failfast") \
.load("gs://tidy-centaur-b1/data/PRESCRIPTIONS_ANO.csv")
编辑:我把整个代码显示唯一键
dfi = df.withColumn("id", F.monotonically_increasing_id()) \
.withColumnRenamed("CHAIPRAT", "chaiprat") \
.withColumnRenamed("PRE_PRE_DTD", "pre_pre_dtd") \
.withColumnRenamed("NbMol", "nbmol") \
.withColumnRenamed("NumAno", "numano")
dfi.createOrReplaceTempView("prescription")
我计算行并将数据保存到 cassandra
dfi.count()
> 4169826
dfi.write.format("org.apache.spark.sql.cassandra") \
.mode("overwrite") \
.option("confirm.truncate","true") \
.option("spark.cassandra.connection.host","10.142.0.4") \
.option("spark.cassandra.connection.port","9042") \
.option("keyspace","uasb03") \
.option("table","prescription") \
.save()
现在我从 cassandra 读取数据并计算行数。
presc = sql.read \
.format("org.apache.spark.sql.cassandra") \
.option("spark.cassandra.connection.host","10.142.0.4") \
.option("spark.cassandra.connection.port","9042") \
.load(table="prescription", keyspace="uasb03")
presc.count()
> 2148762
只有从第一次计数的一半。
我在日志文件中没有发现任何表明出现问题的内容。有人有线索吗 ?
编辑:我尝试更改 cassandra.yaml 中的所有超时值,但 presc.count 保持不变
编辑这里是 cassandra 表描述
cqlsh:uasb03> desc prescription;
CREATE TABLE uasb03.prescription (
id int PRIMARY KEY,
chaiprat int,
nbmol int,
numano int,
pre_pre_dtd timestamp
) WITH bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';
为了执行我的验证,我还在一个 csv 文件中写入了输出,我得到了
chaiprat;pre_pre_dtd;nbmol;numano;id
29100476;03Feb2017:00:00:00;5;378369;8589934592
29100476;24Feb2017:00:00:00;1;378369;8589934593
29100476;27Feb2017:00:00:00;2;378369;8589934594
id 大于 int。
解决方案
最可能的原因是您的数据没有可能成为分区键的真正唯一的行标识符,因此当您存储数据时,某些值会被覆盖。您可以通过在保存数据之前显式创建具有正确分区键和集群列的表来解决此问题。这可以通过createCassandraTable
调用您的数据框来完成(请参阅文档),如下所示:
createCassandraTable(
"uasb03", "prescription",
partitionKeyColumns = Some(Seq("columnA")),
clusteringKeyColumns = Some(Seq("columnB")))
推荐阅读
- reactjs - 多选下拉菜单中的 OnSelect 问题
- python - AttributeError:SMTP_SSL 实例没有属性“__exit__”
- python - 使用 Python 和 ElementTree 对 XML 文档进行排序
- python-3.x - 如何读取和绘制两列并使用 python 3 从数据文件中绘制它们?
- php - 如何在magento 2中添加没有缓存的自定义条件beforeLoad?
- r - R:获取零的确切值(公斤到 1000 等等)
- ios - 发出以编程方式实例化自定义 xib 视图的问题(在 Koloda 卡功能中)
- c - 如何将多个字符附加到C中的字符串?
- excel - 激活最后创建的书 Excel VBA
- python - 如何绕过机器人检测并使用 python 抓取网站