cassandra - 优化通过 Python 驱动向 Cassandra 数据库插入数据
问题描述
我尝试在 Python 驱动程序中使用 BATCH 将 150.000 个生成的数据插入 Cassandra。大约需要30 秒。我应该怎么做才能优化它并更快地插入数据?这是我的代码:
from cassandra.cluster import Cluster
from faker import Faker
import time
fake = Faker()
cluster = Cluster(['127.0.0.1'], port=9042)
session = cluster.connect()
session.default_timeout = 150
num = 0
def create_data():
global num
BATCH_SIZE = 1500
BATCH_STMT = 'BEGIN BATCH'
for i in range(BATCH_SIZE):
BATCH_STMT += f" INSERT INTO tt(id, title) VALUES ('{num}', '{fake.name()}')";
num += 1
BATCH_STMT += ' APPLY BATCH;'
prep_batch = session.prepare(BATCH_STMT)
return prep_batch
tt = []
session.execute('USE ttest_2')
prep_batch = []
print("Start create data function!")
start = time.time()
for i in range(100):
prep_batch.append(create_data())
end = time.time()
print("Time for create fake data: ", end - start)
start = time.time()
for i in range(100):
session.execute(prep_batch[i])
time.sleep(0.00000001)
end = time.time()
print("Time for execution insert into table: ", end - start)
解决方案
主要问题是您使用批处理来插入数据 - 在 Cassandra 中,这是一种不好的做法(请参阅文档以获取解释)。相反,您需要准备一个查询,并一一插入数据 - 这将允许驱动程序将数据路由到特定节点,减少该节点上的负载,并允许更快地执行数据插入。伪代码如下所示(有关确切语法,请参阅python 驱动程序代码):
prep_statement = session.prepare("INSERT INTO tt(id, title) VALUES (?, ?)")
for your_loop:
session.execute(prep_statement, [id, title])
另一个问题是您使用的是同步 API - 这意味着驱动程序会等到插入发生然后触发下一个。要加快速度,您需要改用异步 API(有关详细信息,请参阅相同的文档)。有关最佳实践列表等,请参阅使用 DataStax 驱动程序开发应用程序指南。
但实际上,如果你只是想用数据加载数据库,我建议不要重新发明轮子,但要么:
- 将数据生成为 CSV 文件并使用DSBulk加载到 Cassandra,该 DSBulk为数据加载进行了高度优化
- 使用NoSQLBench生成数据并填充 Cassandra - 它还针对数据生成和加载进行了高度优化(不仅限于 Cassandra)。
推荐阅读
- typescript - RxJS wait until desired value with timeout
- python - 尝试使用 pd.get_dummies 对仅返回 0 和 1 的数据进行热编码
- azure - Azure 表存储检索操作为非空值返回空值
- python - 如何将一个脚本中的可滚动框架导入到不同脚本 tkinter 中的框架中
- javascript - 防止滚动位置在 ajax 刷新时置顶
- python - 重新启动运行时后模型加载得到不同的结果
- json - pandas to_json , django 和 d3.js 用于可视化
- oracle - Oracle v19:正在进行的事务可以长时间阻止对相关表的并发删除吗?
- javascript - 如何通过formdata将值附加到模型中对象内的嵌套键
- c# - 如何在 Index.razor 页面上为 Blazor 使用谷歌字体