首页 > 解决方案 > Gremlin Driver 在使用多个端点初始化 ConnectionPool 时阻塞

问题描述

我们正在 AWS 中运行一个海王星数据库。我们有 1 个 writer 和 3 个 reader 实例。几周前,我们发现负载平衡没有按预期工作。我们发现,我们的软件实例只连接到一个阅读器,并保持此连接直到 EOL。因此,其他读者实例从未被采用。考虑以下链接https://docs.aws.amazon.com/neptune/latest/userguide/feature-overview-endpoints.html。有描述,对于海王星负载平衡,您必须在客户端进行,一个前提条件是,您必须禁用 DNS 缓存。客户端实现在此处分别描述https://docs.amazonaws.cn/en_us/neptune/latest/userguide/best-practices-gremlin-java-multiple.htmlhttps://docs.aws.amazon.com/neptune/latest/userguide/best-practices-gremlin-java-separate.html因为我们分别处理写入器和读取器集群。我们的软件是用java编写的。所以我们实现了如下描述的问题:

在 jvm 中禁用 DNS 缓存:

java.security.Security.setProperty("networkaddress.cache.ttl", "0");

pom.xml 看起来像:

<properties>
    <gremlin.version>3.4.10</gremlin.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.tinkerpop</groupId>
        <artifactId>gremlin-driver</artifactId>
        <version>${gremlin.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.tinkerpop</groupId>
        <artifactId>tinkergraph-gremlin</artifactId>
        <version>${gremlin.version}</version>
    </dependency>
    <dependency>
        <!-- aws neptune db -->
        <groupId>org.apache.tinkerpop</groupId>
        <artifactId>gremlin-core</artifactId>
        <version>${gremlin.version}</version>
    </dependency>
</dependencies>

通过 gremlin 驱动程序连接到数据库:

    Cluster.Builder writer = Cluster.build().port(8182)
            .maxInProcessPerConnection(32).maxSimultaneousUsagePerConnection(32).maxContentLength(4 * 1024 * 1024)
            .serializer(Serializers.GRAPHBINARY_V1D0)
            .addContactPoint("some aws instance enpoint -- 1 --");

    Cluster.Builder reader = Cluster.build().port(8182)
            .maxInProcessPerConnection(32).maxSimultaneousUsagePerConnection(32).maxContentLength(4 * 1024 * 1024)
            .serializer(Serializers.GRAPHBINARY_V1D0)
            .addContactPoint("some aws instance enpoint -- 2 --")
            .addContactPoint("some aws instance enpoint -- 3 --");

    final Cluster writerCluster = writer.create();
    final Cluster readerCluster = reader.create();

    DriverRemoteConnection writerConn = DriverRemoteConnection.using(writerCluster);
    DriverRemoteConnection readerConn = DriverRemoteConnection.using(readerCluster);

    gWriter = AnonymousTraversalSource.traversal().withRemote(writerConn);
    gReader = AnonymousTraversalSource.traversal().withRemote(readerConn);

    for(int i = 0; i < 10; i++){
        NeptuneAdapter.getInstance().setGraph(gWriter);
        System.out.println(gWriter.addV("TestVertex" + i + 1).iterate());
        System.out.println("Vertex added, now: " + gWriter.V().count().next().toString());
        NeptuneAdapter.getInstance().setGraph(gReader);
        System.out.println(gReader.V().count().next().toString());
        System.out.println(gReader.V().count().next().toString());
        System.out.println(gReader.V().count().next().toString());
        System.out.println(gReader.V().count().next().toString());
        System.out.println(gReader.V().count().next().toString());
        System.out.println(gReader.V().count().next().toString());
        Thread.sleep(1000);
    }

问题是,在运行此代码时,第一次获取图表时没有任何反应。经过一番调试,我们发现ConnectionPool的构造函数中有阻塞代码。其中,根据 minPoolSize,为每个 Connection 创建了一个 CompletableFuture。在其中,连接是通过主机建立的。通过 Clusters Manager ScheduledExecutor 执行时,ConnectionPool 构造函数正在加入所有期货。如此处所述,我想在 CompletableFuture List 中做一些未来完成的订单实现似乎是正确的。但是一定有什么东西阻塞了。在检查 gremlin-driver 并注释掉加入代码行并设置一个简单的 Thread.sleep() 之后,代码确实按预期工作。而现在,负载平衡的东西也在起作用。添加一些输出后,上面执行代码的输出如下所示:

CONNECTION_POOL --- constructor --- poolLabel: {address=endpoint -- 1 -- /IP:PORT}
Opening connection pool
LoadBalancingStrategy adding host: Host{address=endpoint -- 1 -- /IP:PORT} host size is now 1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 1 -- /IP:PORT} for next Query
[RemoteStep(DriverServerConnection-address=endpoint -- 1 -- /IP:PORT [graph=g])]
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 1 -- /IP:PORT} for next Query
Vertex added, now: 1
CONNECTION_POOL --- constructor --- poolLabel: {address=endpoint -- 2 -- /IP:PORT}
CONNECTION_POOL --- constructor --- poolLabel: {address=endpoint -- 3 -- /IP:PORT}
Opening connection pool
LoadBalancingStrategy adding host: Host{address=endpoint -- 2 -- /IP:PORT} host size is now 1
Opening connection pool
LoadBalancingStrategy adding host: Host{address=endpoint -- 3 -- /IP:PORT} host size is now 2
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 2 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 3 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 2 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 3 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 2 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 3 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 1 -- /IP:PORT} for next Query
[RemoteStep(DriverServerConnection-address=endpoint -- 1 -- /IP:PORT [graph=g])]
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 1 -- /IP:PORT} for next Query
Vertex added, now: 2
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 2 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 3 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 2 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 3 -- /IP:PORT} for next Query
2
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 2 -- /IP:PORT} for next Query
2
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 3 -- /IP:PORT} for next Query
2

现在的问题是,我们是否以错误的方式使用了 gremlin 驱动程序,或者这是一个错误,我们应该向 tinkerpop-master 存储库添加一个问题?还是有其他我们不明白的魔法?

标签: javaamazon-web-servicesgremlintinkerpop3

解决方案


过去,我们在读取器节点的 Neptune 负载平衡中遇到了这个问题。我们通过利用

https://github.com/awslabs/amazon-neptune-tools/tree/master/neptune-gremlin-client/gremlin-client

为了在客户端处理负载平衡,我们不得不稍微调整一下我们的阅读器客户端。

创建阅读器客户端的更新方式如下所示:

GremlinClient client;
GremlinCluster cluster;
ClusterEndpointsRefreshAgent clusterEndpointRefreshAgent;
String clusterId = "<your_cluster_id>";

     private void createReaderClient(boolean isIAMAuthEnabled) {
            EndpointsSelector endpointsSelector = EndpointsType.ReadReplicas;
            clusterEndpointRefreshAgent = new ClusterEndpointsRefreshAgent(clusterId, endpointsSelector);
            Collection<String> addresses = clusterEndpointRefreshAgent.getAddresses().get(endpointsSelector);
            if (isIAMAuthEnabled) {
                cluster = createNeptuneGremlinClusterBuilder(addresses);
            } else {
                cluster = createGremlinClusterBuilder(addresses);
            }
    
            client = cluster.connect();
            clusterEndpointRefreshAgent.startPollingNeptuneAPI(
                addrs -> client.refreshEndpoints(addrs.get(endpointsSelector)), 300,
                TimeUnit.SECONDS);
        }
    
     private GremlinCluster createGremlinClusterBuilder(Collection<String> addresses) {
            GremlinClusterBuilder builder = GremlinClusterBuilder.build().port(8182)
                .addContactPoints(addresses).enableSsl(true);
            //set other required properties of GremlinCluster
            return builder.create();
        }
    
     private GremlinCluster createNeptuneGremlinClusterBuilder(Collection<String> addresses) {
            NeptuneGremlinClusterBuilder builder = NeptuneGremlinClusterBuilder.build()
                .port(8182).addContactPoints(addresses)
                .enableSsl(true).enableIamAuth(true);
            // set other required properties of NeptuneGremlinClusterBuilder
            return builder.create();
        }

可以在创建如下内容之前创建此阅读器客户端GraphTraversalSource

    GraphTraversalSource g;
    GraphTraversalSource getGraphTraversalSource(boolean isIAMAuthEnabled) {
        if (g == null) {
            createReaderClient(isIAMAuthEnabled);
            g = AnonymousTraversalSource.traversal().withRemote(DriverRemoteConnection.using(client));
        }
        return g;
    }

推荐阅读