java - Java [Peer-To-Peer]:Runnable 意外停止/阻塞
问题描述
我正在开发一个简单的分布式账本。我希望能够在不同的端口上启动节点,这些节点可以相互通信。然后每个程序都有一个文件,它将在其中写入新发现的节点。
起初,只有最可靠的节点被硬编码到该文件中。这是程序上发生的事情:
1)我启动一个新节点,它启动一个HTTP服务器(我使用com.sun.HttpServer)。服务器有一个 GetAddress 处理程序,它侦听转到指定 URI 的请求。然后它获取 IP 和 PORT(在 URI 查询参数中指定),获取known_nodes.txt文件的信号量,并将新收到的对等地址写入该文件(如果它不存在),并发送新的内容更新文件作为 json 列表返回给请求者。
2) 在我的 Node 类中(如前所述,它在一个单独的线程上启动一个 HTTPServer),我创建了一个 ScheduledExecutorService 并给它一个 runnable 以每隔几秒运行一次,它的工作是连接到存在于known_nodes.txt文件,并询问他们的 known_nodes。如果我们收到之前在 known_nodes 文件中不存在的节点,我们会覆盖我们的文件。
现在! 如果我启动一个节点,并尝试从浏览器请求它,一切都会按计划进行 - 我们收到一个请求,将其写入我们的文件,然后我们的可运行对象将尝试连接到请求中指定的地址。如果我们捕获到SocketTimeoutException,我们会从 known_nodes.txt 文件中删除地址。
问题出现了,当我启动两个节点时,假设在端口 8001 和 8002 上运行。请注意,每个节点都有自己的 known_nodes 文件。发生的情况是,其中一个节点将停止运行 DiscoverAddresses 任务,而另一个则不会。如此有效,一个节点停止接收请求。
注意!将停止其计划任务的节点仍将发送至少一个发现请求,然后将死亡/阻塞(?)。
这是可运行任务的代码:
@Override
public void run() {
log.info("still running ");
PeerAddressesHolder inactiveNodes = new PeerAddressesHolder();
ApplicationConfiguration appConf = ApplicationConfiguration.getInstance();
for (PeerAddress peerAddress : knownNodes.getAddresses()) {
if (isSameNode(peerAddress)) {
continue;
}
String urlString = String.format("http://%s:%s%s?myport=%d", peerAddress.getIP(), peerAddress.getPort(), Constants.GET_ADDRESS, myPort);
try {
StringBuilder result = new StringBuilder();
URL url = new URL(urlString);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setConnectTimeout(5000);
conn.setRequestMethod("GET");
try (InputStream connInputStream = conn.getInputStream();
InputStreamReader ir = new InputStreamReader(connInputStream);
BufferedReader br = new BufferedReader(ir)){
String line;
while ((line = br.readLine()) != null) {
result.append(line).append('\n');
}
} catch (Exception e) {
log.warn("Couldn't read from connection input stream",e);
}
PeerAddressesHolder peerAddressesHolder = gson.fromJson(result.toString(), PeerAddressesHolder.class);
boolean fetchedNew = false;
for (PeerAddress fetchedAddress : peerAddressesHolder.getAddresses()) {
if (!isValidAddress(peerAddress)) {
log.warn("Peer has sent us a null-address. It will be ignored.");
return;
}
if (!knownNodes.contains(fetchedAddress)) {
knownNodes.addAddress(fetchedAddress);
fetchedNew = true;
}
}
if (fetchedNew) {
FileUtils.writeToFile(appConf.getKnownNodesFilePath(), gson.toJson(knownNodes), false);
}
} catch (SocketTimeoutException e) {
if (appConf.getMostReliableNodes().contains(peerAddress)) {
log.warn("Most reliable node not available: " + peerAddress);
} else {
inactiveNodes.addAddress(peerAddress);
log.warn("Connection timeout from " + peerAddress + ". It will be removed.");
}
} catch (Exception e) {
log.warn("Couldn't discover new addresses." + e);
}
}
try {
knownNodes.removeAll(inactiveNodes.getAddresses());
FileUtils.writeToFile(appConf.getKnownNodesFilePath(), gson.toJson(knownNodes), false);
} catch (IOException ioe) {
log.warn("Couldn't write to file after deleting dead node", ioe);
}
}
这是我在创建节点时启动它的方式。
public NetworkNode(int port) {
this.appConf = ApplicationConfiguration.getInstance();
this.port = port;
log.info("Starting a new node on port " + port);
try {
this.knownNodes = FileUtils.createPeerAddressesList(appConf.getKnownNodesFilePath());
} catch (Exception e) {
log.error("Error while trying to construct a list of peer addresses from file content on path: " + appConf.getKnownNodesFilePath());
}
scheduledExecutorService = Executors.newScheduledThreadPool(4);
scheduledExecutorService.scheduleAtFixedRate(new DiscoverAddressesTask(knownNodes, this.port), 3, 4, TimeUnit.SECONDS);
处理文件读/写的方法都是使用 try-with-resources 构造完成的,所以我最初的想法是,由于一些未关闭的流而导致可运行的停止可能是无效的。
解决方案
推荐阅读
- python - 重新评估 Pandas 列中的数据类型
- python - Selenium Python does not send.keys full text
- rest - 使用多部分文件处理 Json 请求
- c# - .net core db 连接池监控到 sql-server
- database - HTTP PUT 请求更新数据库中的弧,塑造身体,“中心”点
- eigen - 在行和列上重复特征矩阵的每个元素
- ios - 在iphone中做广告时,ble数据结构显示mac地址有什么变化?
- swift - 只有实例方法可以声明@IBAction?
- prometheus - Prometheus:并发请求数量下降时发出警报
- rubiks-cube - 如何创建用于解决魔方的模式数据库?