spring-boot - Cassandra Java API,Datastax
问题描述
是否有人使用 AsyncCassandraTemplate 进行具有自定义对象列表的批处理操作?
我需要使用相同的,但似乎不再支持传递迭代。
解决方案
您可以使用Datastax Driver轻松实现这一目标。如果您使用 Maven,只需将其添加到您的pom
文件中:
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-extras</artifactId>
<version>3.3.2</version>
</dependency>
然后创建一个实体类:
@Table(
name = "message",
keyspace = "test")
public class Message {
@PartitionKey
@Column(name = "message_id")
private String messageId;
@ClusteringColumn
private String date;
private String title;
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
}
然后使用以下代码,您可以构建一个集群,然后启动一些对象,然后为其保存查询创建语句,然后将它们添加到要异步执行的批处理语句中
public void executeBatchStatement() {
Cluster cluster = makeCluster();
Session session = cluster.connect();
MappingManager mappingManager = new MappingManager(session);
Mapper<Message> messageMapper = mappingManager.mapper(Message.class);
Message messageObj1 = new Message();
Message messageObj2 = new Message();
Message messageObj3 = new Message();
// populate these objects
Statement messageStatement1 = messageMapper.saveQuery(messageObj1, Mapper.Option.saveNullFields(false)); // now this Statement represents the query to save this object
Statement messageStatement2 = messageMapper.saveQuery(messageObj2, Mapper.Option.saveNullFields(false));
Statement messageStatement3 = messageMapper.saveQuery(messageObj3, Mapper.Option.saveNullFields(false));
BatchStatement messageBatchStatement = new BatchStatement();
messageBatchStatement.add(messageStatement1);
messageBatchStatement.add(messageStatement2);
messageBatchStatement.add(messageStatement3);
session.executeAsync(messageBatchStatement); // execute asynchronously
}
private Cluster makeCluster() {
return Cluster.builder()
.addContactPoint("localhost")
.withPort(9042)
.build();
}
如果您想处理执行结果或在成功或失败时做某事,您也可以做这样的事情
ResultSetFuture future = session.executeAsync(messageBatchStatement);
Futures.addCallback(future,
new FutureCallback<ResultSet>() {
@Override public void onSuccess(ResultSet result) {
// handle success
}
@Override public void onFailure(Throwable t) {
// handle error
}
}
);
推荐阅读
- angular - 类型 'void' 不可分配给类型 'ObservableInput
' - scala - 交叉构建依赖于特定库的不同版本的项目
- reactjs - 如何防止在反应导航中重新渲染嵌套导航器?
- puppet - 木偶执行/ wget
- python - 如何使用 pip install git +"git repo" 重命名站点包?
- powershell - 防止意外删除存储在 Active Directory 域服务 (ADDS) 中的 DNS 区域
- java - 我可以使用 Play Asset Delivery (Android Studio) 为每个构建变体添加不同的资产吗?
- regex - 如何在 VBScript 中将重音字符转换为十六进制 Unicode?
- amazon-web-services - AWS Elasticsearch 超出了索引中总字段的限制
- shopify - Shopify Liquid:向登录模板添加部分