asynchronous - Cassandra 异步读写,最佳实践
问题描述
为了设置上下文,我们在 cassandra 中有 4 个表,在这 4 个表中,一个是数据表,其余的是搜索表(假设 DATA、SEARCH1、SEARCH2 和 SEARCH3 是表)。
对于 DATA 表,我们有一个初始加载要求,一个请求中最多 15k 行,因此搜索表要保持同步。我们以批量插入的方式执行此操作,每个 bacth 作为 4 个查询(每个表一个)以保持一致性。
但是对于每个批次,我们都需要读取数据。如果存在,只更新 DATA 表的 lastUpdatedDate 列,否则插入所有 4 个表。
下面是我们正在做的代码片段:
public List<Items> loadData(List<Items> items) {
CountDownLatch latch = new CountDownLatch(items.size());
ForkJoinPool pool = new ForkJoinPool(6);
pool.submit(() -> items.parallelStream().forEach(item -> {
BatchStatement batch = prepareBatchForCreateOrUpdate(item);
batch.setConsistencyLevel(ConsistencyLevel.LOCAL_ONE);
ResultSetFuture future = getSession().executeAsync(batch);
Futures.addCallback(future, new AsyncCallBack(latch), pool);
}));
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
//TODO Consider what to do with the failed Items, Retry? or remove from the items in the return type
return items;
}
private BatchStatement prepareBatchForCreateOrUpdate(Item item) {
BatchStatement batch = new BatchStatement();
Item existingItem = getExisting(item) //synchronous read
if (null != data) {
existingItem.setLastUpdatedDateTime(new Timestamp(System.currentTimeMillis()));
batch.add(existingItem));
return batch;
}
batch.add(item);
batch.add(convertItemToSearch1(item));
batch.add(convertItemToSearch2(item));
batch.add(convertItemToSearch3(item));
return batch;
}
class AsyncCallBack implements FutureCallback<ResultSet> {
private CountDownLatch latch;
AsyncCallBack(CountDownLatch latch) {
this.latch = latch;
}
// Cooldown the latch for either success or failure so that the thread that is waiting on latch.await() will know when all the asyncs are completed.
@Override
public void onSuccess(ResultSet result) {
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
LOGGER.warn("Failed async query execution, Cause:{}:{}", t.getCause(), t.getMessage());
latch.countDown();
}
}
考虑到网络往返 b/w 应用程序和 cassandra 集群,15k 个项目的执行大约需要 1.5 到 2 分钟(两者都驻留在相同的 DNS 上,但 kubernetes 上的 pod 不同)
我们有想法使读取调用 getExisting(item) 也异步,但处理失败案例变得越来越复杂。是否有更好的方法来加载 cassandra 的数据(仅考虑通过 datastax 企业 java 驱动程序进行异步写入)。
解决方案
第一件事 - Cassandra 中的批次与关系数据库不同。通过使用它们,您会给集群增加更多负载。
关于使一切异步,我考虑了以下可能性:
- 对数据库进行查询,获取一个
Future
并向其添加侦听器 - 将在查询完成时执行(覆盖onSuccess
); - 通过该方法,您可以根据从 Cassandra 获得的结果安排下一个操作的执行。
您需要确保检查的一件事是,您不会同时发出太多的并发请求。在协议的第 3 版中,每个连接最多可以有 32k 个正在进行的请求,但在您的情况下,您最多可以发出 60k (4x15k) 个请求。我在 Session 类周围使用以下包装器来限制进行中请求的数量。
推荐阅读
- c# - 如何清除 CS0029 C# 无法将类型“void”隐式转换为“int”并且 CS1729 C#“Car”不包含采用 4 个参数的构造函数
- javascript - 尝试请求时 URL 更改
- mysql - 如何选择只有 n 值的 id
- python - 如何使用 feedparser 自动检查/重新加载 rss 提要?
- google-apps-script - Google Apps 脚本 URLFetchApp 权限
- python - 支持 Python 枚举中的未知值
- python - 函数使用 *args 在 lineplot 中显示多列 DataFrame
- ios - 无法打开适用于 iOS 的相机
- c# - 通过脚本传递数据
- node.js - Mongo聚合忽略$match中的日期范围过滤器