首页 > 解决方案 > ZipOutputStream 阻塞 Vertx 事件循环

问题描述

我正在处理一个场景,我正在读取目录中的文件,然后创建一个 zip 文件。但是这个操作有时会阻塞 Vertx 线程,我在 Vertx 跟踪中收到以下异常:

021-06-01 14:46:22.533 io.vertx.core.impl.BlockedThreadChecker [WARNING] Thread Thread[vert.x-eventloop-thread-4,5,main]=Thread[vert.x-eventloop-thread-4,5,main] has been blocked for 65088 ms, time limit is 2000 ms
io.vertx.core.VertxException: Thread blocked
    at java.util.zip.Deflater.deflateBytes(Native Method)
    at java.util.zip.Deflater.deflate(Deflater.java:444)
    at java.util.zip.Deflater.deflate(Deflater.java:366)
    at java.util.zip.DeflaterOutputStream.deflate(DeflaterOutputStream.java:251)
    at java.util.zip.DeflaterOutputStream.write(DeflaterOutputStream.java:211)
    at java.util.zip.ZipOutputStream.write(ZipOutputStream.java:331)

以下是我创建 zip 文件的方法

 private String zipDirectory(File dir, String zipDirName) {
     _log.info("Entered zip file utility2");
     String zipFilePath;

        try(FileOutputStream fos = new FileOutputStream(zipDirName);
            ZipOutputStream zos = new ZipOutputStream(fos);) {
            populateFilesList(dir);
           
            for(String filePath : _filesListInDir){
                _log.info("FILES: "+filePath);
                File file = new File(filePath);
                
                if(!"zip".equals(Files.getFileExtension(file.getName()))) {
                    ZipEntry ze = new ZipEntry(filePath.substring(dir.getAbsolutePath().length()+1, filePath.length()));
                    zos.putNextEntry(ze);
                    FileInputStream fis = new FileInputStream(filePath);
                    byte[] buffer = new byte[1024];
                    int len;
                    while ((len = fis.read(buffer)) > 0) {
                        zos.write(buffer, 0, len);
                    }
                    zos.flush();
                    zos.closeEntry();
                    fis.close();
            
                }else {
                    _log.info("Ignore zip for writing");
                }
                
            }
            Path zipFilePathDir = Paths.get(zipDirName);
            zipFilePath = zipFilePathDir.getFileName().toString();
            _log.info("Zip file name: "+zipFilePath);
            zos.close();
            fos.close();
        } catch (IOException e) {
            zipFilePath = "FAILURE";
            _log.error("Error creating zip file: "+e.getMessage());
        }
        
        return zipFilePath;
    }

谁能提供任何建议,以确保我不会阻止 Vertx 上的主事件循环

标签: javavert.xvertx-eventbus

解决方案


您可以使用Vertx.executeBlocking在 Vert.x 管理的工作池中运行该方法:

Future<String> fut = vertx.executeBlocking(promise -> promise.complete(zipDirectory(dir, zipDirName));

如果您的方法需要阻塞超过 5 或 10 秒,您可能还想创建自己的专用 ThreadPool 仅用于执行 Vert.x 调用的该方法 a WorkerExecutor,并WorkerExecutor.executeBlocking改为使用。

// Create a WorkerExecutor with 1 thread, where each method call
// can run for 2 minutes before Vertx logs blocked thread warnings
WorkerExecutor we = vertx.createSharedExecutor("zip", 1, 2, TimeUnit.MINUTES)
Future<String> fut = we.executeBlocking(promise -> promise.complete(zipDirectory(dir, zipDirName));

推荐阅读