首页 > 解决方案 > java, OrbitzWorld consul client - 如何管理会话和锁?

问题描述


使用 java OrbitzWorld consul 客户端,我试图通过acquireLock方法同步我的 java 应用程序的多个实例。

到目前为止我的代码:
将应用注册为领事服务:

private void registerService(Config config) {
        String serviceId = config.getService().getId();
        String serviceName = config.getService().getName();
        long ttl = config.getService().getTtl();
        AgentClient agentClient = client.agentClient();
        Registration service = ImmutableRegistration.builder()
                .id(serviceId)
                .name(serviceName)
                .check(Registration.RegCheck.ttl(ttl))
                .build();
        agentClient.register(service);
        new HeartBeater(agentClient, serviceId, ttl).start();
    }

心跳:

@Override
    public void run() {
        while(true) {
            try {
                client.pass(serviceId);
                Thread.sleep((Math.max(ttl / 2, 1)));
            } catch (NotRegisteredException | InterruptedException e) {}
        }
    }

上面的代码有效并且服务在领事中成功刷新。
现在我想知道锁定的实现。

到目前为止我所写的:

public boolean amILeader() {
    // return if current java app is leader
}

private String createSession() {
    final Session session = ImmutableSession.builder().name(config.getService().getName()).build();
    return client.sessionClient().createSession(session).getId();
}

private void watchLeaderLockStateChange() {
    KeyValueClient keyValueClient = client.keyValueClient();
    KVCache kvCache = KVCache.newCache(keyValueClient, Constants.LEADER_LOCK_KEY, config.getService().getWatchKey());
    kvCache.addListener(map -> {
        Value value = map.get(Constants.LEADER_LOCK_KEY);
        if(!value.getSession().isPresent()) {
            keyValueClient.acquireLock(Constants.LEADER_LOCK_KEY, ???); //create new session here ???
        }
    });
    kvCache.start();
}

我被困在这里,因为我不了解理论并且在文档中没有发现任何有用的东西。

我的问题:

你能提供一些代码示例或填写我的实现吗?感谢您的任何回复:]

标签: javaspring-bootconsul

解决方案


我想我现在明白了。

理论是这样的:

  • Consul 会话表示从单个服务到 Consul 的连接。就我而言,它代表我的一个 java 应用程序实例和 Consul 之间的连接
  • 会话用于获取锁。当客户来到 Consul 并想要获取锁时,Consul 会检查是否有任何 sessionId 与之关联。如果没有,Consul 给客户端一个锁,并将客户端 sessionId 与锁相关联。
  • 锁没什么花哨的。它只是保存在 Consul 节点上的 KV 映射中的一个键。
  • 您可以检查锁以及是否有任何 sessionId 与它相关联,如下所示:
    public class SessionFacade {
        private String leaderLock;
        private String sessionId;
        private Consul client;
        private Config config;

        public SessionFacade(Consul client, Config config) {
            this.client = client;
            this.config = config;
            this.leaderLock = "service/" + config.getService().getName() + "/leader";
            this.sessionId = createSession();
            new SessionHeartBeater(client, sessionId, config.getService().getSessionTtl()).start();
            watchLeaderLockStateChange(sessionId);
            client.keyValueClient().acquireLock(leaderLock, sessionId);
        }

        public boolean doIPossesLeaderLock() {
            Optional<Value> leaderValue = client.keyValueClient().getValue(leaderLock);
            if(leaderValue.isPresent()) {
                Optional<String> session = leaderValue.get().getSession();
                return session.isPresent() && session.get().equals(sessionId);
            }
            return false;
        }

        private String createSession() {
            int sessionTtl = config.getService().getSessionTtl();
            final Session session = ImmutableSession.builder()
                    .name(config.getService().getName())
                    .ttl(sessionTtl + "s")
                    .build();
            return client.sessionClient().createSession(session).getId();
        }

        private void watchLeaderLockStateChange(String sessionId) {
            KeyValueClient keyValueClient = client.keyValueClient();
            KVCache kvCache = KVCache.newCache(keyValueClient, leaderLock, config.getService().getWatchLockEach());
            kvCache.addListener(map -> {
                Value value = map.get(leaderLock);
                if(!value.getSession().isPresent()) {
                    keyValueClient.acquireLock(leaderLock, sessionId);
                }
            });
            kvCache.start();
        }
    }

请注意,代码可能有问题,因为我还没有完全测试它。


推荐阅读