首页 > 解决方案 > 我们如何在 Dgraph 中使用 lagom 的读取端处理器?

问题描述

我是 lagom 和 dgraph 的新手。而且我一直坚持如何将 lagom 的读取端处理器与Dgraph一起使用。只是为了给你一个想法,下面是使用 Cassandra 和 lagom 的代码。

import akka.NotUsed;
import com.lightbend.lagom.javadsl.api.ServiceCall;
import com.lightbend.lagom.javadsl.persistence.cassandra.CassandraSession;
import java.util.concurrent.CompletableFuture;
import javax.inject.Inject;
import akka.stream.javadsl.Source;
public class FriendServiceImpl implements FriendService {

private final CassandraSession cassandraSession;

@Inject
public FriendServiceImpl(CassandraSession cassandraSession) {
    this.cassandraSession = cassandraSession;
}

//Implement your service method here

}

标签: javamicroserviceslagomdgraph

解决方案


Lagom 不为 Dgraph 提供开箱即用的支持。如果您必须将 Lagom 的 Read-Side 处理器与 Dgraph 一起使用,那么您必须使用 Lagom 的Generic Read Side support。像这样:

/**
 * Read side processor for Dgraph.
 */
public class FriendEventProcessor extends ReadSideProcessor<FriendEvent> {
    private static void createModel() {
        //TODO: Initialize schema in Dgraph
    }

    @Override
    public ReadSideProcessor.ReadSideHandler<FriendEvent> buildHandler() {
        return new ReadSideHandler<FriendEvent>() {
            private final Done doneInstance = Done.getInstance();

            @Override
            public CompletionStage<Done> globalPrepare() {
                createModel();
                return CompletableFuture.completedFuture(doneInstance);
            }

            @Override
            public CompletionStage<Offset> prepare(final AggregateEventTag<FriendEvent> tag) {
                return CompletableFuture.completedFuture(Offset.NONE);
            }

            @Override
            public Flow<Pair<FriendEvent, Offset>, Done, ?> handle() {
                return Flow.<Pair<FriendEvent, Offset>>create()
                        .mapAsync(1, eventAndOffset -> {
                                    if (eventAndOffset.first() instanceof FriendCreated) {
                                        //TODO: Add Friend in Dgraph;
                                    }

                                    return CompletableFuture.completedFuture(doneInstance);
                                }
                        );
            }
        };
    }

    @Override
    public PSequence<AggregateEventTag<FriendEvent>> aggregateTags() {
        return FriendEvent.TAG.allTags();
    }
}

对于FriendEvent.TAG.allTags(),您必须在FriendEvent界面中添加以下代码:

int NUM_SHARDS = 20;

  AggregateEventShards<FriendEvent> TAG =
          AggregateEventTag.sharded(FriendEvent.class, NUM_SHARDS);

  @Override
  default AggregateEventShards<FriendEvent> aggregateTag() {
    return TAG;
  }

我希望这有帮助!


推荐阅读