python - 如何提高向 ScyllaDB 添加数据的性能?
问题描述
我尝试使用Cassandra 和 Scylla 官方文档中描述的准备好的语句,但是对于 100,000 条消息,性能仍然在 30 秒左右。有什么想法可以改进吗?
query = "INSERT INTO message (id, message) VALUES (?, ?)"
prepared = session.prepare(query)
for key in range(100000):
try:
session.execute_async(prepared, (0, "my example message"))
except Exception as e:
print("An error occured : " + str(e))
pass
更新
我发现强烈建议使用批处理来提高性能的信息,所以我根据官方文档使用准备好的语句和批处理。我现在的代码是这样的:
print("time 0: " + str(datetime.now()))
query = "INSERT INTO message (id, message) VALUES (uuid(), ?)"
prepared = session.prepare(query)
for key in range(100):
print(key)
try:
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
for key in range(100):
batch.add(prepared, ("example message",))
session.execute(batch)
except Exception as e:
print("An error occured : " + str(e))
pass
print("time 1: " + str(datetime.now()))
您是否知道为什么性能如此缓慢,并且在运行此源代码后结果如下所示?
test 0: 2018-06-19 11:10:13.990691
0
1
...
41
cAn error occured : Error from server: code=1100 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out for messages.message - received only 1 responses from 2 CL=QUORUM." info={'write_type': 'BATCH', 'required_responses': 2, 'consistency': 'QUORUM', 'received_responses': 1}
42
...
52 An error occured : errors={'....0.3': 'Client request timeout. See Session.execute[_async](timeout)'}, last_host=.....0.3
53
An error occured : Error from server: code=1100 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out for messages.message - received only 1 responses from 2 CL=QUORUM." info={'write_type': 'BATCH', 'required_responses': 2, 'consistency': 'QUORUM', 'received_responses': 1}
54
...
59
An error occured : Error from server: code=1100 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out for messages.message - received only 1 responses from 2 CL=QUORUM." info={'write_type': 'BATCH', 'required_responses': 2, 'consistency': 'QUORUM', 'received_responses': 1}
60
61
62
...
69
70
71
An error occured : errors={'.....0.2': 'Client request timeout. See Session.execute[_async](timeout)'}, last_host=.....0.2
72
An error occured : errors={'....0.1': 'Client request timeout. See Session.execute[_async](timeout)'}, last_host=....0.1
73
74
...
98
99
test 1: 2018-06-19 11:11:03.494957
解决方案
在我的机器上,通过大量并行化插入,我使用本地机器获得了此类问题的亚秒执行时间。
➜ loadz ./loadz
execution time: 951.701622ms
恐怕我不知道如何在 Python 中做到这一点,但在 Go 中它可能看起来像这样:
package main
import (
"fmt"
"sync"
"time"
"github.com/gocql/gocql"
)
func main() {
cluster := gocql.NewCluster("127.0.0.1")
cluster.Keyspace = "mykeyspace"
session, err := cluster.CreateSession()
if err != nil {
panic(err)
}
defer session.Close()
workers := 1000
ch := make(chan *gocql.Query, 100001)
wg := &sync.WaitGroup{}
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
for q := range ch {
if err := q.Exec(); err != nil {
fmt.Println(err)
}
}
}()
}
start := time.Now()
for i := 0; i < 100000; i++ {
ch <- session.Query("INSERT INTO message (id,message) VALUES (uuid(),?)", "the message")
}
close(ch)
wg.Wait()
dur := time.Since(start)
fmt.Printf("execution time: %s\n", dur)
}
如果您想测试它,请根据需要调整连接参数。