首页 > 解决方案 > Java:当我的应用程序运行较长时间时出现内存不足错误

问题描述

我有一个 java 应用程序,我在其中获取非常小的文件(1KB),但在一分钟内获取大量小文件,即我在一分钟内获取 20000 个文件。我正在获取文件并上传到 S3 。

我在 10 个并行线程中运行它。我也必须不断地运行这个应用程序。

当此应用程序运行几天时,我得到内存不足错误。

这是我得到的确切错误

#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 347376 bytes for Chunk::new
# Possible reasons:
#   The system is out of physical RAM or swap space
#   In 32 bit mode, the process size limit was hit
# Possible solutions:
#   Reduce memory load on the system
#   Increase physical memory or swap space
#   Check if swap backing store is full
#   Use 64 bit Java on a 64 bit OS
#   Decrease Java heap size (-Xmx/-Xms)
#   Decrease number of Java threads
#   Decrease Java thread stack sizes (-Xss)
#   Set larger code cache with -XX:ReservedCodeCacheSize=
# This output file may be truncated or incomplete.
#
#  Out of Memory Error (allocation.cpp:390), pid=6912, tid=0x000000000003ec8c
#
# JRE version: Java(TM) SE Runtime Environment (8.0_181-b13) (build 1.8.0_181-b13)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.181-b13 mixed mode windows-amd64 compressed oops)
# Core dump written. Default location: d:\S3FileUploaderApp\hs_err_pid6912.mdmp
#

这是我的 java 类。我正在复制所有类,以便于调查。

这是我的 Java Visual VM 报告图像

在此处输入图像描述

添加我的示例输出

在此处输入图像描述

更新元空间图像

在此处输入图像描述

这是我的主要课程

  public class UploadExecutor {
    private static Logger _logger = Logger.getLogger(UploadExecutor.class);

    public static void main(String[] args) {
        _logger.info("----------STARTING JAVA MAIN METHOD----------------- ");
        /*
         * 3 C:\\Users\\u6034690\\Desktop\\TWOFILE\\xml
         * a205381-tr-fr-production-us-east-1-trf-auditabilty
         */
        final int batchSize = 100;
        while (true) {
            String strNoOfThreads = args[0];
            String strFileLocation = args[1];
            String strBucketName = args[2];
            int iNoOfThreads = Integer.parseInt(strNoOfThreads);
            S3ClientManager s3ClientObj = new S3ClientManager();
            AmazonS3Client s3Client = s3ClientObj.buildS3Client();
            try {
                FileProcessThreads fp = new FileProcessThreads();
                File[] files = fp.getFiles(strFileLocation);
                try {
                    _logger.info("No records found will wait for 10 Seconds");
                    TimeUnit.SECONDS.sleep(10);
                    files = fp.getFiles(strFileLocation);
                    ArrayList<File> batchFiles = new ArrayList<File>(batchSize);
                    if (null != files) {
                        for (File path : files) {
                            String fileType = FilenameUtils.getExtension(path.getName());
                            long fileSize = path.length();
                            if (fileType.equals("gz") && fileSize > 0) {
                                batchFiles.add(path);
                            }
                            if (batchFiles.size() == batchSize) {
                                BuildThread BuildThreadObj = new BuildThread();
                                BuildThreadObj.buildThreadLogic(iNoOfThreads, s3Client, batchFiles, strFileLocation,
                                        strBucketName);
                                _logger.info("---Batch One got completed---");
                                batchFiles.clear();
                            }
                        }
                    }
                    // to consider remaining or files with count<batch size
                    if (!batchFiles.isEmpty()) {
                        BuildThread BuildThreadObj = new BuildThread();
                        BuildThreadObj.buildThreadLogic(iNoOfThreads, s3Client, batchFiles, strFileLocation,
                                strBucketName);
                        batchFiles.clear();
                    }
                } catch (InterruptedException e) {
                    _logger.error("InterruptedException: " + e.toString());
                }
            } catch (Throwable t) {
                _logger.error("InterruptedException: " + t.toString());
            }
        }
    }
}

这是我构建线程和关闭执行程序的类。因此,对于每次运行,我都会创建新的 Executor 服务。

   public class BuildThread {

    private static Logger _logger = Logger.getLogger(BuildThread.class);

    public  void buildThreadLogic(int iNoOfThreads,AmazonS3Client s3Client, List<File> records,String strFileLocation,String strBucketName) {


        _logger.info("Calling buildThreadLogic method of BuildThread class");

        final ExecutorService executor = Executors.newFixedThreadPool(iNoOfThreads);
        int recordsInEachThraed = (int) (records.size() / iNoOfThreads);
        int threadIncr=2;
        int recordsInEachThreadStart=0;
        int recordsInEachThreadEnd=0;

        for (int i = 0; i < iNoOfThreads; i++) {
            if (i==0){
                recordsInEachThreadEnd=recordsInEachThraed;
            }
            if (i==iNoOfThreads-1){
                recordsInEachThreadEnd=records.size();
            }

            Runnable worker = new UploadObject(records.subList(recordsInEachThreadStart, recordsInEachThreadEnd), s3Client,strFileLocation,strBucketName);
            executor.execute(worker);
            recordsInEachThreadStart=recordsInEachThreadEnd;
            recordsInEachThreadEnd=recordsInEachThraed*(threadIncr);
            threadIncr++;
        }

        executor.shutdown();
        while (!executor.isTerminated()) {
        }

        _logger.info("Existing buildThreadLogic method");
    }

}

这是我将文件上传到 S3 并运行方法的类

   public class UploadObject implements Runnable {

    private static Logger _logger;
    List<File> records;
    AmazonS3Client s3Client;
    String fileLocation;
    String strBucketName;

    UploadObject(List<File> list, AmazonS3Client s3Client, String  fileLocation, String strBucketName) {
        this.records = list;
        this.s3Client = s3Client;
        this.fileLocation=fileLocation;
        this.strBucketName=strBucketName;

        _logger = Logger.getLogger(UploadObject.class);
    }

    public void run() {
        uploadToToS3();
    }

    public void uploadToToS3() {
        _logger.info("Number of record to be uploaded  in current thread: : " + records.size());
        TransferManager tm = new TransferManager(s3Client);
        final MultipleFileUpload upload = tm.uploadFileList(strBucketName, "", new File(fileLocation), records);
        try {
            upload.waitForCompletion();
        } catch (AmazonServiceException e1) {
            _logger.error("AmazonServiceException " + e1.getErrorMessage());
            System.exit(1);
        } catch (AmazonClientException e1) {
            _logger.error("AmazonClientException " +  e1.getMessage());
            System.exit(1);
        } catch (InterruptedException e1) {
            _logger.error("InterruptedException " +  e1.getMessage());
            System.exit(1);
        } finally {
            _logger.info("--Calling TransferManager ShutDown--");
            tm.shutdownNow(false);

        }

        CleanUp CleanUpObj=new CleanUp();
        CleanUpObj.deleteUploadedFile(upload,records);
    }
}

此类用于创建 S3 客户端管理器

public class S3ClientManager {

    private static Logger _logger = Logger.getLogger(S3ClientManager.class);

    public  AmazonS3Client buildS3Client() {

        _logger.info("Calling buildS3Client method of S3ClientManager class");
        AWSCredentials credential = new ProfileCredentialsProvider("TRFAuditability-Prod-ServiceUser").getCredentials();
        AmazonS3Client s3Client = (AmazonS3Client) AmazonS3ClientBuilder.standard().withRegion("us-east-1")
                .withCredentials(new AWSStaticCredentialsProvider(credential)).withForceGlobalBucketAccessEnabled(true)
                .build();
        s3Client.getClientConfiguration().setMaxConnections(5000);
        s3Client.getClientConfiguration().setConnectionTimeout(6000);
        s3Client.getClientConfiguration().setSocketTimeout(30000);
        _logger.info("Exiting  buildS3Client method of S3ClientManager class");
        return s3Client;
    }
}

这是我获取文件的地方。

public class FileProcessThreads {

    public  File[] getFiles(String fileLocation)  {

        File dir = new File(fileLocation);
        File[] directoryListing = dir.listFiles();
        if (directoryListing.length > 0)
            return directoryListing;
       return null;

    }
} 

标签: javamultithreadingamazon-s3

解决方案


很抱歉没有解决关于内存泄漏的原始问题,但你的方法对我来说似乎完全有缺陷。System.exit()调用 at可能是导致资源泄漏的UploadObject原因,但这仅仅是开始。Amazon S3TransferManager已经有一个内部执行器服务,因此您不需要自己的多线程控制器。我看不出您将如何允许每个文件仅上传一次。您进行多次上传调用,然后删除所有文件,而不管上传期间是否出现故障,因此文件不在 S3 中。您尝试在执行者之间分发文件,这是不必要的。在上面添加更多线程TransferManager ExecutorService不会提高您的性能,只会导致颠簸。

我会采用不同的方法。

首先是一个非常简单的主类,它只是启动一个工作线程并等待它完成。

public class S3Uploader {

    public static void main(String[] args) throws Exception {
        final String strNoOfThreads = args[0];
        final String strFileLocation = args[1];
        final String strBucketName = args[2];

        // Maximum number of file names that are read into memory
        final int maxFileQueueSize = 5000;

        S3UploadWorkerThread worker = new S3UploadWorkerThread(strFileLocation, strBucketName, Integer.parseInt(strNoOfThreads), maxFileQueueSize);

        worker.run();

        System.out.println("Uploading files, press any key to stop.");
        System.in.read();

        // Gracefully halt the worker thread waiting for any ongoing uploads to finish
        worker.finish();

        // Exit the main thread only after the worker thread has terminated
        worker.join();

    }

}

工作线程将使用 aSemaphore来限制发送到 的上传数量TransferManager,使用自定义文件名队列FileEnqueue不断地从源目录读取文件,并使用 aProgressListener来跟踪每次上传的进度。如果循环用完了要从源目录读取的文件,它会等待十秒钟并重试。甚至文件队列也可能是不必要的。仅仅列出while工作线程循环中的文件就足够了。

import java.io.File;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;

import com.amazonaws.AmazonClientException;

import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;

public class S3UploadWorkerThread extends Thread {

    private final String sourceDir;
    private final String targetBucket;
    private final int maxQueueSize;
    private final AmazonS3Client s3Client;

    private Semaphore uploadLimiter;
    private boolean running;

    public final long SLEEP_WHEN_NO_FILES_AVAILABLE_MS = 10000l; // 10 seconds

    public S3UploadWorkerThread(final String sourceDir, final String targetBucket, final int maxConcurrentUploads, final int maxQueueSize) {
        this.running = false;
        this.sourceDir = sourceDir.endsWith(File.separator) ? sourceDir: sourceDir + File.separator;
        this.targetBucket = targetBucket;
        this.maxQueueSize = maxQueueSize;
        this.s3Client = S3ClientManager.buildS3Client();
        this.uploadLimiter = new Semaphore(maxConcurrentUploads);
    }

    public void finish() {
        running = false;
    }

    @Override
    public void run() {
        running = true;

        final Map<String, Upload> ongoingUploads = new ConcurrentHashMap<>();
        final FileEnqueue queue = new FileEnqueue(sourceDir, maxQueueSize);
        final TransferManager tm = TransferManagerBuilder.standard().withS3Client(s3Client).build();

        while (running) {
            // Get a file name from the in memory queue
            final String fileName = queue.poll();
            if (fileName!=null) {
                try {
                    // Limit the number of concurrent uploads
                    uploadLimiter.acquire();
                    File fileObj = new File(sourceDir + fileName);
                    // Create an upload listener
                    UploadListener onComplete = new UploadListener(fileObj, queue, ongoingUploads, uploadLimiter);
                    try {
                        Upload up = tm.upload(targetBucket, fileName, fileObj);
                        up.addProgressListener(onComplete);
                        // ongoingUploads is  used later to wait for ongoing uploads in case a finish() is requested
                        ongoingUploads.put(fileName, up);
                    } catch (AmazonClientException e) {
                        System.err.println("AmazonClientException " +  e.getMessage());
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                // poll() returns null when the source directory is empty then wait for a number of seconds
                try {
                    Thread.sleep(SLEEP_WHEN_NO_FILES_AVAILABLE_MS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } // fi
        } // wend

        // Wait for ongoing uploads to finish before exiting ending the worker thread
        for (Map.Entry<String,Upload> e : ongoingUploads.entrySet()) {
            try {
                e.getValue().waitForCompletion();
            } catch (AmazonClientException | InterruptedException x) {
                System.err.println(x.getClass().getName() + " at " + e.getKey());
            }
        } // next

        tm.shutdownNow();
    }
}

UploadListener发布允许,在上传Semaphore完成时通知文件队列,并跟踪正在进行的上传,如果用户请求有序停止,则必须等待。使用 a ProgressListener,您可以单独跟踪每个成功或失败的上传。

import java.io.File;

import java.util.Map;
import java.util.concurrent.Semaphore;

import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.transfer.Upload;

public class UploadListener implements ProgressListener {

    private final File fileObj;
    private final FileEnqueue queue;
    private final Map<String, Upload> ongoingUploads;
    private final Semaphore uploadLimiter;

    public UploadListener(File fileObj, FileEnqueue queue, Map<String, Upload> ongoingUploads, Semaphore uploadLimiter) {
        this.fileObj = fileObj;
        this.queue = queue;
        this.ongoingUploads = ongoingUploads;
        this.uploadLimiter = uploadLimiter;
    }

    @Override
    public void progressChanged(ProgressEvent event) {

        switch(event.getEventType()) {

        case TRANSFER_STARTED_EVENT :
            System.out.println("Started upload of file " + fileObj.getName());
            break;

        case TRANSFER_COMPLETED_EVENT:
            /* Upon a successful upload:
             * 1. Delete the file from disk
             * 2. Notify the file name queue that the file is done
             * 3. Remove it from the map of ongoing uploads
             * 4. Release the semaphore permit
             */
            fileObj.delete();
            queue.done(fileObj.getName());
            ongoingUploads.remove(fileObj.getName());
            uploadLimiter.release();
            System.out.println("Successfully finished upload of file " + fileObj.getName());
            break;

        case TRANSFER_FAILED_EVENT:
            queue.done(fileObj.getName());
            ongoingUploads.remove(fileObj.getName());
            uploadLimiter.release();
            System.err.println("Failed upload of file " + fileObj.getName());
            break;

        default:
            // do nothing
        }       
    }
}

这是文件队列的样板样本:

import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.ConcurrentSkipListSet;

public class FileEnqueue {

    private final String sourceDir;
    private final ConcurrentSkipListSet<FileItem> seen;
    private final ConcurrentSkipListSet<String> processing;
    private final int maxSeenSize;

    public FileEnqueue(final String sourceDirectory, int maxQueueSize) {
        sourceDir = sourceDirectory;
        maxSeenSize = maxQueueSize;
        seen = new ConcurrentSkipListSet<FileItem>();
        processing = new ConcurrentSkipListSet<>();
    }

    public synchronized String poll() {
        if (seen.size()==0)
            enqueueFiles();
        FileItem fi = seen.pollFirst();
        if (fi==null) {
            return null;
        } else {
            processing.add(fi.getName());
            return fi.getName();
        }
    }

    public void done(final String fileName) {
        processing.remove(fileName);
    }

    private void enqueueFiles() {
        final FileFilter gzFilter = new GZFileFilter();
        final File dir = new File(sourceDir);

        if (!dir.exists() ) {
            System.err.println("Directory " +  sourceDir + " not found");
        } else if (!dir.isDirectory() ) {
            System.err.println(sourceDir + " is not a directory");
        } else {

            final File [] files = dir.listFiles(gzFilter);

            if (files!=null) {

                // How many more file names can we read in memory
                final int spaceLeft = maxSeenSize - seen.size();

                // How many new files will be read into memory
                final int maxNewFiles = files.length<maxSeenSize ? files.length : spaceLeft;

                for (int f=0, enqueued=0; f<files.length && enqueued<maxNewFiles; f++) {
                    File fl = files[f];
                    FileItem fi = new FileItem(fl);
                    // Do not put into the queue any file which has been already seen or is processing
                    if (!seen.contains(fi) && !processing.contains(fi.getName())) {
                        seen.add(fi);
                        enqueued++;
                    }
                } // next
            }
        } // fi
    }

    private class GZFileFilter implements FileFilter {

        @Override
        public boolean accept(File f) {
            final String fname = f.getName().toLowerCase();         
            return f.isFile() && fname.endsWith(".gz") && f.length()>0L;
        }   
    }

}

最后是你的 S3ClientManager:

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;

public class S3ClientManager {

    public static AmazonS3Client buildS3Client() {

        AWSCredentials credential = new ProfileCredentialsProvider("TRFAuditability-Prod-ServiceUser").getCredentials();
        AmazonS3Client s3Client = (AmazonS3Client) AmazonS3ClientBuilder.standard().withRegion("us-east-1")
                .withCredentials(new AWSStaticCredentialsProvider(credential)).withForceGlobalBucketAccessEnabled(true)
                .build();
        s3Client.getClientConfiguration().setMaxConnections(5000);
        s3Client.getClientConfiguration().setConnectionTimeout(6000);
        s3Client.getClientConfiguration().setSocketTimeout(30000);

        return s3Client;
    }
}

2019 年 4 月 30 日更新添加 FileItem 类

import java.io.File;
import java.util.Comparator;

public class FileItem implements Comparable {

    private final String name;
    private final long dateSeen;

    public FileItem(final File file) {
        this.name = file.getName();
        this.dateSeen = System.currentTimeMillis();
    }

    public String getName() {
        return name;
    }

    public long getDateSeen() {
        return dateSeen;
    }

    @Override
    public int compareTo(Object otherObj) {
        FileItem otherFileItem = (FileItem) otherObj;
        if (getDateSeen()==otherFileItem.getDateSeen())
            return getName().compareTo(otherFileItem.getName());
        else if (getDateSeen()<otherFileItem.getDateSeen())
            return -1;
        else
            return 1;
    }

    @Override
    public boolean equals(Object otherFile) {
        return getName().equals(((FileItem) otherFile).getName());
    }

    @Override
    public int hashCode() {
        return getName().hashCode();
    }

    public static final class CompareFileItems implements Comparator {

        @Override
        public int compare(Object fileItem1, Object fileItem2) {
            return ((FileItem) fileItem1).compareTo(fileItem2);
        }
    }

}

推荐阅读