java - 应该如何处理线程锁以保持其他线程等待直到下载文件然后允许所有线程一次性读取文件
问题描述
我正在使用 ExecutorService fixedThreadPool() 来运行TASK。
此处的任务定义为从特定 URL 下载文件,如果文件不存在则将其保存到数据库中,否则仅从数据库中读取文件。所以它更像是一个读写器问题,执行器线程池的任何一个线程都可以充当一次写入器,而其他线程将成为后续请求的读取器。
我正在使用 Semaphore 来执行此操作,但这种方法的问题是后续读取请求是按顺序发生的。
如果 4 个TASK旨在访问相同的 URL,我需要同步,直到下载文件并释放信号量,即在 4 个线程中任何人都可以获得锁,其余 3 个正在等待。下载完成后,所有剩余的 3 个线程应同时读取下载的文件。但就我而言,最后一步是按顺序进行的,这也会对项目绩效产生影响。
说了上面的用例,下面是我的示例代码:
接下来的 Runnable 被传递给 ExecutorService 以在 SharedObject 类上执行任务。
class DownloadRunnable(SharedObjectMT sharedObject, String url) implement Runnable {
void run() {
sharedObject.loadFile(url);
}
}
class SharedObjectMT {
// This Hashmap acts ConcurrentHashMap with URL and semaphore mapping. So
// multiple threads requesting for the same URL will only be synchronized on their
// corresponding semaphore. And threads requesting for different URLs
// will run concurrently.
private static HashMap<String, Semaphore> syncMap = new HashMap<>();
.....
void loadFile(String url) {
// Let all threads enter sequentially and try to assign a Semaphore for their url in the
// hashmap. If the url has never been requested, then only a Semaphore(say S1) will be
// assigned to that url. And for all the other threads with *same request url*, this
// Semaphore(S1) will be used to handle concurrency.
synchronized(syncMap) {
if(syncMap[url] == null) {
syncMap[url] = new Semaphore(1);
}
}
Semaphore semaphore = syncMap[url];
synchronized(semaphore) {
............
............
semaphore.acquire();
String filePath = findInDatabase(url);
if(filePath != null) {
semaphore.release(); // no need to hold semaphore since file already downloaded.
printStatus("Already downloaded file from url = "+url);
} else {
// This DownloadThread is actually a mock of my real project where a third-party
// library uses a thread to download the file.
DownloadThread(() -> {
printStatus("Download completed for url= "+ url +". Releasing semaphore.");
semaphore.release();
}).start();
.............
.............
}
}
}
}
我知道单个信号量无法帮助我。也许我们可以再使用 1 个 Semaphore 来区分读写锁或任何其他锁定机制。所以需要一些关于如何使用这种一次性同步的帮助。
注意:如果您在上面的代码中发现任何语法错误,请忽略,因为实际项目是在 Kotlin 中,但这是一个基本的 Java 多线程问题,所以我将它作为 Java 代码发布。
解决方案
我不确定 Kotlin,但我可以用 Java 演示:
import java.io.IOException;
import java.util.HashMap;
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class DownloadOrRead {
//Utility method, which just generates a random String instance...
private static String randomString(final int length) {
String alphabet = "abcdefghijklmnopqrstuvwxyz";
alphabet += alphabet.toUpperCase();
alphabet += "0123456789";
final int alphabetSize = alphabet.length();
final char[] chars = new char[length];
final Random rand = new Random();
for (int i = 0; i < chars.length; ++i)
chars[i] = alphabet.charAt(rand.nextInt(alphabetSize));
return String.valueOf(chars);
}
public static class DownLoadCallable implements Callable<String> {
private final String url;
public DownLoadCallable(final String url) {
this.url = Objects.requireNonNull(url);
}
@Override
public String call() throws IOException, InterruptedException {
/*Utilize url property here to download the file...
In our case, just simulate a download delay supposedly...*/
Thread.sleep(5000L + (long) (Math.random() * 10000L));
//Return the file's local path...
return randomString(20); //In our case, a random String of 20 characters.
}
}
//This is the method you are looking for:
public static String loadPath(final ExecutorService executorService, //Can be shared between calls of loadPath...
final HashMap<String, Future<String>> urlToFuture, //MUST be shared between calls of loadPath!
final String url) //The URL. Can be the same as a URL in a previous call of loadPath.
throws InterruptedException, ExecutionException {
final Future<String> future;
synchronized (urlToFuture) {
if (!urlToFuture.containsKey(url)) //If nowhere to be seen...
urlToFuture.put(url, executorService.submit(new DownLoadCallable(url))); //Create a Future...
future = urlToFuture.get(url); //Obtain the Future (new or old).
}
return future.get(); //Outside the synchronized block!
}
public static void main(final String[] args) {
System.out.println("Creating ExecutorService...");
final ExecutorService executorService = Executors.newFixedThreadPool(10);
System.out.println("Creating shared map...");
final HashMap<String, Future<String>> urlToFuture = new HashMap<>();
System.out.println("Creating random URLs...");
final String[] urls = new String[]{randomString(10), randomString(20), randomString(15)};
try {
System.out.println("Downloading files sequencially...");
final Random rand = new Random();
for (int i = 0; i < 100; ++i)
System.out.println(loadPath(executorService, urlToFuture, urls[rand.nextInt(urls.length)]));
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
}
catch (final InterruptedException | ExecutionException x) {
System.err.println(x);
}
}
}
整个想法是将Callable
s 提交给ExecutorService
处理下载的。我们还利用该方法Future
返回的 ssubmit
来获取所需的结果路径/文件/任何内容。只需调用get
所需的Future
对象即可。您唯一需要同步的是sMap
的 URL Future
。
你会注意到,当运行这个测试程序时,第一个文件在下载之前被阻塞,然后对相同 URL 的后续调用立即完成(因为 URL 已经下载)并且我们只阻塞每个新 URL(未下载然而)。在这种情况下,我只使用了 3 个随机 URL,每个 URL 需要 5 到 15 秒才能完成,这给了我们大约 15 到 45 秒的正常运行时间,因为我们按顺序下载它们。
方法到此结束loadPath
。但是在上面的示例代码中,文件是按顺序下载的。如果您还需要多个Thread
s 来下载,您可以loadPath
从 many Thread
s 调用(不需要在 shared 之外的其他地方进一步同步Map
)。
正如可以在此答案中看到的那样,似乎在操作完成后调用get
相同的方法Future
,总是会产生相同的对象,或者Exception
如果失败则抛出相同的对象。这是我们在本文中提供的代码中使用的优势。
编辑1:
或者更好,正如@drekbour 在评论中指出的那样,使用computeIfAbsent
and aConcurrentHashMap
来完成这项工作,如下所示:
import java.util.Objects;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class DownloadOrRead1 {
//Utility method, which just generates a random String instance...
private static String randomString(final int length) {
String alphabet = "abcdefghijklmnopqrstuvwxyz";
alphabet += alphabet.toUpperCase();
alphabet += "0123456789";
final int alphabetSize = alphabet.length();
final char[] chars = new char[length];
final Random rand = new Random();
for (int i = 0; i < chars.length; ++i)
chars[i] = alphabet.charAt(rand.nextInt(alphabetSize));
return String.valueOf(chars);
}
public static class DownLoadCallable implements Callable<String> {
private final String url;
public DownLoadCallable(final String url) {
this.url = Objects.requireNonNull(url);
}
@Override
public String call() throws InterruptedException {
System.out.println("Downloading " + url + "...");
/*Utilize url property here to download the file...
In our case, just simulate a download delay supposedly...*/
Thread.sleep(5000L + (long) (Math.random() * 10000L));
System.out.println("Downloaded " + url + '.');
//Return the file's local path...
return randomString(20); //In our case, a random String of 20 characters.
}
}
//This is the method you are looking for:
public static String loadPath(final ExecutorService executorService, //Can be shared between calls of loadPath...
final ConcurrentHashMap<String, Future<String>> urlToFuture, //MUST be shared between calls of loadPath!
final String url) //The URL. Can be the same as a URL in a previous call of loadPath.
throws InterruptedException, ExecutionException {
return urlToFuture.computeIfAbsent(url, url2 -> executorService.submit(new DownLoadCallable(url2))).get();
}
public static void main(final String[] args) {
System.out.println("Creating ExecutorService...");
final ExecutorService executorService = Executors.newFixedThreadPool(10);
System.out.println("Creating shared Map...");
final ConcurrentHashMap<String, Future<String>> urlToFuture = new ConcurrentHashMap<>();
System.out.println("Creating random URLs...");
final String[] urls = new String[]{randomString(10), randomString(10), randomString(10)};
try {
System.out.println("Downloading files sequencially...");
final Random rand = new Random();
for (int i = 0; i < 100; ++i) {
final String url = urls[rand.nextInt(urls.length)];
System.out.println("Path for " + url + ": " + loadPath(executorService, urlToFuture, url));
}
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
}
catch (final InterruptedException | ExecutionException x) {
System.err.println(x);
}
}
}
推荐阅读
- ruby-on-rails - 批量分配 .update 不调用依赖: :destroy on children
- python - List 是否有协变可变版本?
- google-colaboratory - 在 colab 中重复从驱动器读取多个脚本
- arrays - Clickhouse - 矩阵逐项加法:如何对二维数组求和?
- google-apps-script - 有没有办法检查某个邮件是否使用应用程序脚本在 gmail 中发送?
- c - 在#pragma pack(push,ALIGNMENT) 中作为宏传递的对齐会导致代码生成硬故障
- swift - 列表中的 SwiftUI edgeIgnoringSafeArea
- python - 没有函数返回的python函数
- apache-kafka - ksqlDB 表中键的原子性
- node.js - 如何在 NEST JS 中为特定/不同的请求方法处理多个中间件?