首页 > 解决方案 > Java中同步文件写入机制到多个文件中同时写入

问题描述

我想为 Spring 应用程序创建一个同步的文件写入机制。我有大约 10 000 000 个 json,应该保存在单独的文件中,例如:

其他需求:

我创建了 FileWriterProxy (singleton bean),它是保存文件的主要组件。它加载负责写入文件的惰性 FileWriter 组件(原型 bean)(它具有同步写入方法)。每个 FileWriter 对象代表一个单独的文件。我怀疑我的解决方案不是线程安全的。让我们考虑以下场景:

  1. 有 3 个线程 (Thread1, Thread2, Thread3) 想要写入同一个文件 (1.json),它们都从 FileWriterProxy 组件中调用 write 方法
  2. Thread1 正在获取正确的 FileWriter
  3. Thread1 为 1.json 文件锁定 FileWriter
  4. Thread1 正在写入 1.json 文件
  5. Thread1 正在完成对文件的写入,并将从 ConcurrentHashMap 中删除 FileWriter
  6. 同时 Thread2 正在为 1.json 文件获取 FileWriter 并等待 Thread1 释放锁
  7. Thread1 正在释放锁并从 ConcurrentHashMap 中删除 FileWriter
  8. 现在 Thread2 可以写入 1.json 文件(它具有已从 ConcurrentHashMap 中删除的 FileWriter)
  9. Thread3 正在获取 1.json 的 FileWriter(一个新的!旧的 FileWriter 已被 Thread1 删除)
  10. Thread2 和 Thread3 同时写入同一个文件,因为它们锁定了不同的 FileWriters 对象

如果我错了,请纠正我。如何修复我的实施?

FileWriter代理:

@Component
public class FileWriterProxy {
    private final BeanFactory beanFactory;
    private final Map<String, FileWriter> filePathsMappedToFileWriters = new ConcurrentHashMap<>();

    public FileWriterProxy(BeanFactory beanFactory) {
        this.beanFactory = beanFactory;
    }

    public void write(Path path, String data) {
        FileWriter fileWriter = getFileWriter(path);
        fileWriter.write(data);
        removeFileWrite(path);
    }

    private FileWriter getFileWriter(Path path) {
        return filePathsMappedToFileWriters.computeIfAbsent(path.toString(), e -> beanFactory.getBean(FileWriter.class, path));
    }

    private void removeFileWrite(Path path) {
        filePathsMappedToFileWriters.remove(path.toString());
    }

}

FileWriterProxyTest:

@RunWith(SpringRunner.class)
@SpringBootTest
public class FileWriterProxyTest {

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final String FILE_NAME = "filename.txt";
    private File baseDirectory;
    private Path path;

    @Autowired
    private FileWriterProxy fileWriterProxy;

    @Before
    public void setUp() {
        baseDirectory = temporaryFolder.getRoot();
        path = Paths.get(baseDirectory.getAbsolutePath(), FILE_NAME);
    }

    @Test
    public void writeToFile() throws IOException {
        String data = "test";
        fileWriterProxy.write(path, data);
        String fileContent = new String(Files.readAllBytes(path));
        assertEquals(data, fileContent);
    }

    @Test
    public void concurrentWritesToFile() throws InterruptedException {
        Path path = Paths.get(baseDirectory.getAbsolutePath(), FILE_NAME);
        List<Task> tasks = Arrays.asList(
                new Task(path, "test1"),
                new Task(path, "test2"),
                new Task(path, "test3"),
                new Task(path, "test4"),
                new Task(path, "test5"));
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        List<Future<Boolean>> futures = executorService.invokeAll(tasks);

        wait(futures);
    }

    @Test
    public void manyRandomWritesToFiles() throws InterruptedException {
        List<Task> tasks = createTasks(1000);
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        List<Future<Boolean>> futures = executorService.invokeAll(tasks);
        wait(futures);
    }

    private void wait(List<Future<Boolean>> tasksFutures) {
        tasksFutures.forEach(e -> {
            try {
                e.get(10, TimeUnit.SECONDS);
            } catch (Exception e1) {
                e1.printStackTrace();
            }
        });
    }

    private List<Task> createTasks(int number) {
        List<Task> tasks = new ArrayList<>();

        IntStream.range(0, number).forEach(e -> {
            String fileName = generateFileName();
            Path path = Paths.get(baseDirectory.getAbsolutePath(), fileName);
            tasks.add(new Task(path, "test"));
        });

        return tasks;
    }

    private String generateFileName() {
        int length = 10;
        boolean useLetters = true;
        boolean useNumbers = false;
        return RandomStringUtils.random(length, useLetters, useNumbers) + ".txt";
    }

    private class Task implements Callable<Boolean> {
        private final Path path;
        private final String data;

        Task(Path path, String data) {
            this.path = path;
            this.data = data;
        }

        @Override
        public Boolean call() {
            fileWriterProxy.write(path, data);
            return true;
        }
    }
}

配置:

@Configuration
public class Config {

    @Bean
    @Lazy
    @Scope("prototype")
    public FileWriter fileWriter(Path path) {
        return new FileWriter(path);
    }

}

文件编写器:

public class FileWriter {
    private static final Logger logger = LoggerFactory.getLogger(FileWriter.class);

    private final Path path;

    public FileWriter(Path path) {
        this.path = path;
    }

    public synchronized void write(String data) {
        String filePath = path.toString();
        try {
            Files.write(path, data.getBytes());
            logger.info("File has been saved: {}", filePath);
        } catch (IOException e) {
            logger.error("Error occurred while writing to file: {}", filePath);
        }
    }

}

标签: javaspringfileconcurrencyconcurrenthashmap

解决方案


推荐阅读