java - CassandraIO 无法保存时间戳
问题描述
这是我的简单代码,它从 pubsub 订阅中读取,并将消息正文保存到带有当前时间戳的 Cassandra 表。
该消息是从订阅中消费的,但没有记录插入到表中,也没有错误消息。
但是,如果我在类 TestTable 中将日期类型“时间戳”更改为 Long,则此代码正在工作并将记录插入表中。
这是创建表的脚本。
DROP TABLE IF EXISTS test_table;
CREATE TABLE IF NOT EXISTS test_table(
post_index int,
ingestion_time TIMESTAMP,
body text,
PRIMARY KEY ((post_index))
);
@Table(keyspace = "{keyspace_name}", name = "{table_name}",
readConsistency = "LOCAL_QUORUM",
writeConsistency = "LOCAL_QUORUM",
caseSensitiveKeyspace = false,
caseSensitiveTable = false)
class TestTable implements Serializable {
@PartitionKey
@Column(name="post_index")
Integer postIndex;
@Column(name="ingestion_time")
Timestamp ingestionTime;
@Column(name = "body")
String body;
public Integer getPostIndex() {
return postIndex;
}
public void setPostIndex(Integer postIndex) {
this.postIndex = postIndex;
}
public Timestamp getIngestionTime() {
return ingestionTime;
}
public void setIngestionTime(Timestamp ingestionTime) {
this.ingestionTime = ingestionTime;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
public TestTable(Integer postIndex, Timestamp ingestionTime, String body) {
this.body = body;
this.ingestionTime = ingestionTime;
this.postIndex = postIndex;
}
public TestTable() {
this.body = "";
this.ingestionTime = Timestamp.from(Instant.now());
this.postIndex = 0;
}
}
public class TestCassandraJobJava {
public static void main(String[] args) {
Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.fromArgs(args).create());
PCollection<String> data = pipeline.apply("ReadStrinsFromPubsub",
PubsubIO.readStrings().fromSubscription("projects/{project_id}/subscriptions/{subscription_name}"))
.apply("window", Window.into(FixedWindows.of(Duration.standardSeconds(5))))
.apply("CreateMutation", ParDo.of(new DoFn<String, TestTable>() {
@ProcessElement
public void processElement(@Element String word, OutputReceiver<TestTable> out) {
TestTable t = new TestTable(new Random().nextInt(), java.sql.Timestamp.from(Instant.now()), word);
out.output(t);
}
})).apply(CassandraIO.<TestTable>write()
.withHosts(Arrays.asList("127.0.0.1"))
.withPort(9042)
.withKeyspace("{keyspace}")
.withLocalDc("Cassandra")
.withEntity(TestTable.class)
);
pipeline.run().waitUntilFinish();
}
}