java - 我们如何在 Dgraph 中使用 lagom 的读取端处理器?
问题描述
我是 lagom 和 dgraph 的新手。而且我一直坚持如何将 lagom 的读取端处理器与Dgraph一起使用。只是为了给你一个想法,下面是使用 Cassandra 和 lagom 的代码。
import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.persistence.cassandra.CassandraSession;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
import akka.stream.javadsl.Source;
public class FriendServiceImpl implements FriendService {
private final CassandraSession cassandraSession;
@Inject
public FriendServiceImpl(CassandraSession cassandraSession) {
this.cassandraSession = cassandraSession;
}
//Implement your service method here
}
解决方案
Lagom 不为 Dgraph 提供开箱即用的支持。如果您必须将 Lagom 的 Read-Side 处理器与 Dgraph 一起使用,那么您必须使用 Lagom 的Generic Read Side support。像这样:
/**
* Read side processor for Dgraph.
*/
public class FriendEventProcessor extends ReadSideProcessor<FriendEvent> {
private static void createModel() {
//TODO: Initialize schema in Dgraph
}
@Override
public ReadSideProcessor.ReadSideHandler<FriendEvent> buildHandler() {
return new ReadSideHandler<FriendEvent>() {
private final Done doneInstance = Done.getInstance();
@Override
public CompletionStage<Done> globalPrepare() {
createModel();
return CompletableFuture.completedFuture(doneInstance);
}
@Override
public CompletionStage<Offset> prepare(final AggregateEventTag<FriendEvent> tag) {
return CompletableFuture.completedFuture(Offset.NONE);
}
@Override
public Flow<Pair<FriendEvent, Offset>, Done, ?> handle() {
return Flow.<Pair<FriendEvent, Offset>>create()
.mapAsync(1, eventAndOffset -> {
if (eventAndOffset.first() instanceof FriendCreated) {
//TODO: Add Friend in Dgraph;
}
return CompletableFuture.completedFuture(doneInstance);
}
);
}
};
}
@Override
public PSequence<AggregateEventTag<FriendEvent>> aggregateTags() {
return FriendEvent.TAG.allTags();
}
}
对于FriendEvent.TAG.allTags()
,您必须在FriendEvent
界面中添加以下代码:
int NUM_SHARDS = 20;
AggregateEventShards<FriendEvent> TAG =
AggregateEventTag.sharded(FriendEvent.class, NUM_SHARDS);
@Override
default AggregateEventShards<FriendEvent> aggregateTag() {
return TAG;
}
我希望这有帮助!
推荐阅读
- apache-spark - 如何将 ArrayType(StructType) 的 spark 数据框列拆分为 pyspark 中的多个列?
- flutter - 如何在 Dart/Flutter 中获取整个月的天数?
- java - 使用 if 语句为变量赋值
- ios - Twitter登录异常颤动
- c# - UWP RichEditBox RightClick->Paste 未触发 Paste 事件
- python - 为什么用于跨行比较值的向量化 python 代码不起作用?
- discord.js - Discord.js bot 向它所在的每个服务器中的特定频道发送消息
- version-control - hg diff 显示特定目录中文件的不相关更改
- ruby-on-rails - Grunt 还是 Sprockets 更适合应对铁轨
- c# - 有没有办法在 Windows 中使用 C# 以编程方式禁用磁盘驱动器上的写入缓存策略?