首页 > 解决方案 > Cassandra Java API,Datastax

问题描述

是否有人使用 AsyncCassandraTemplate 进行具有自定义对象列表的批处理操作?

我需要使用相同的,但似乎不再支持传递迭代。

标签: spring-bootspring-datadatastaxcassandra-3.0datastax-java-driver

解决方案


您可以使用Datastax Driver轻松实现这一目标。如果您使用 Maven,只需将其添加到您的pom文件中:

<dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>cassandra-driver-core</artifactId>
    <version>3.3.2</version>
</dependency>

<dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>cassandra-driver-mapping</artifactId>
    <version>3.3.2</version>
</dependency>

<dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>cassandra-driver-extras</artifactId>
    <version>3.3.2</version>
</dependency>

然后创建一个实体类:

@Table(
    name = "message",
    keyspace = "test")

public class Message {
    @PartitionKey
    @Column(name = "message_id")
    private String messageId;

    @ClusteringColumn
    private String date;

    private String title;

    public String getMessageId() {
        return messageId;
    }

    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }

    public String getDate() {
        return date;
    }

    public void setDate(String date) {
        this.date = date;
    }

    public String getTitle() {
        return title;
    }

    public void setTitle(String title) {
        this.title = title;
    }
}

然后使用以下代码,您可以构建一个集群,然后启动一些对象,然后为其保存查询创建语句,然后将它们添加到要异步执行的批处理语句中

public void executeBatchStatement() {
    Cluster cluster = makeCluster();
    Session session = cluster.connect();

    MappingManager mappingManager = new MappingManager(session);
    Mapper<Message> messageMapper = mappingManager.mapper(Message.class);

    Message messageObj1 = new Message();
    Message messageObj2 = new Message();
    Message messageObj3 = new Message();
    // populate these objects

    Statement messageStatement1 = messageMapper.saveQuery(messageObj1, Mapper.Option.saveNullFields(false)); // now this Statement represents the query to save this object
    Statement messageStatement2 = messageMapper.saveQuery(messageObj2, Mapper.Option.saveNullFields(false));
    Statement messageStatement3 = messageMapper.saveQuery(messageObj3, Mapper.Option.saveNullFields(false));

    BatchStatement messageBatchStatement = new BatchStatement();
    messageBatchStatement.add(messageStatement1);
    messageBatchStatement.add(messageStatement2);
    messageBatchStatement.add(messageStatement3);

    session.executeAsync(messageBatchStatement); // execute asynchronously
}

private Cluster makeCluster() {
    return Cluster.builder()
            .addContactPoint("localhost")
            .withPort(9042)
            .build();
} 

如果您想处理执行结果或在成功或失败时做某事,您也可以做这样的事情

ResultSetFuture future = session.executeAsync(messageBatchStatement);
Futures.addCallback(future,
    new FutureCallback<ResultSet>() {
        @Override public void onSuccess(ResultSet result) {
            // handle success
        }

        @Override public void onFailure(Throwable t) {
            // handle error
        }
    }
);

推荐阅读