java - Java:当我的应用程序运行较长时间时出现内存不足错误
问题描述
我有一个 java 应用程序,我在其中获取非常小的文件(1KB),但在一分钟内获取大量小文件,即我在一分钟内获取 20000 个文件。我正在获取文件并上传到 S3 。
我在 10 个并行线程中运行它。我也必须不断地运行这个应用程序。
当此应用程序运行几天时,我得到内存不足错误。
这是我得到的确切错误
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (malloc) failed to allocate 347376 bytes for Chunk::new
# Possible reasons:
# The system is out of physical RAM or swap space
# In 32 bit mode, the process size limit was hit
# Possible solutions:
# Reduce memory load on the system
# Increase physical memory or swap space
# Check if swap backing store is full
# Use 64 bit Java on a 64 bit OS
# Decrease Java heap size (-Xmx/-Xms)
# Decrease number of Java threads
# Decrease Java thread stack sizes (-Xss)
# Set larger code cache with -XX:ReservedCodeCacheSize=
# This output file may be truncated or incomplete.
#
# Out of Memory Error (allocation.cpp:390), pid=6912, tid=0x000000000003ec8c
#
# JRE version: Java(TM) SE Runtime Environment (8.0_181-b13) (build 1.8.0_181-b13)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.181-b13 mixed mode windows-amd64 compressed oops)
# Core dump written. Default location: d:\S3FileUploaderApp\hs_err_pid6912.mdmp
#
这是我的 java 类。我正在复制所有类,以便于调查。
这是我的 Java Visual VM 报告图像
添加我的示例输出
更新元空间图像
这是我的主要课程
public class UploadExecutor {
private static Logger _logger = Logger.getLogger(UploadExecutor.class);
public static void main(String[] args) {
_logger.info("----------STARTING JAVA MAIN METHOD----------------- ");
/*
* 3 C:\\Users\\u6034690\\Desktop\\TWOFILE\\xml
* a205381-tr-fr-production-us-east-1-trf-auditabilty
*/
final int batchSize = 100;
while (true) {
String strNoOfThreads = args[0];
String strFileLocation = args[1];
String strBucketName = args[2];
int iNoOfThreads = Integer.parseInt(strNoOfThreads);
S3ClientManager s3ClientObj = new S3ClientManager();
AmazonS3Client s3Client = s3ClientObj.buildS3Client();
try {
FileProcessThreads fp = new FileProcessThreads();
File[] files = fp.getFiles(strFileLocation);
try {
_logger.info("No records found will wait for 10 Seconds");
TimeUnit.SECONDS.sleep(10);
files = fp.getFiles(strFileLocation);
ArrayList<File> batchFiles = new ArrayList<File>(batchSize);
if (null != files) {
for (File path : files) {
String fileType = FilenameUtils.getExtension(path.getName());
long fileSize = path.length();
if (fileType.equals("gz") && fileSize > 0) {
batchFiles.add(path);
}
if (batchFiles.size() == batchSize) {
BuildThread BuildThreadObj = new BuildThread();
BuildThreadObj.buildThreadLogic(iNoOfThreads, s3Client, batchFiles, strFileLocation,
strBucketName);
_logger.info("---Batch One got completed---");
batchFiles.clear();
}
}
}
// to consider remaining or files with count<batch size
if (!batchFiles.isEmpty()) {
BuildThread BuildThreadObj = new BuildThread();
BuildThreadObj.buildThreadLogic(iNoOfThreads, s3Client, batchFiles, strFileLocation,
strBucketName);
batchFiles.clear();
}
} catch (InterruptedException e) {
_logger.error("InterruptedException: " + e.toString());
}
} catch (Throwable t) {
_logger.error("InterruptedException: " + t.toString());
}
}
}
}
这是我构建线程和关闭执行程序的类。因此,对于每次运行,我都会创建新的 Executor 服务。
public class BuildThread {
private static Logger _logger = Logger.getLogger(BuildThread.class);
public void buildThreadLogic(int iNoOfThreads,AmazonS3Client s3Client, List<File> records,String strFileLocation,String strBucketName) {
_logger.info("Calling buildThreadLogic method of BuildThread class");
final ExecutorService executor = Executors.newFixedThreadPool(iNoOfThreads);
int recordsInEachThraed = (int) (records.size() / iNoOfThreads);
int threadIncr=2;
int recordsInEachThreadStart=0;
int recordsInEachThreadEnd=0;
for (int i = 0; i < iNoOfThreads; i++) {
if (i==0){
recordsInEachThreadEnd=recordsInEachThraed;
}
if (i==iNoOfThreads-1){
recordsInEachThreadEnd=records.size();
}
Runnable worker = new UploadObject(records.subList(recordsInEachThreadStart, recordsInEachThreadEnd), s3Client,strFileLocation,strBucketName);
executor.execute(worker);
recordsInEachThreadStart=recordsInEachThreadEnd;
recordsInEachThreadEnd=recordsInEachThraed*(threadIncr);
threadIncr++;
}
executor.shutdown();
while (!executor.isTerminated()) {
}
_logger.info("Existing buildThreadLogic method");
}
}
这是我将文件上传到 S3 并运行方法的类
public class UploadObject implements Runnable {
private static Logger _logger;
List<File> records;
AmazonS3Client s3Client;
String fileLocation;
String strBucketName;
UploadObject(List<File> list, AmazonS3Client s3Client, String fileLocation, String strBucketName) {
this.records = list;
this.s3Client = s3Client;
this.fileLocation=fileLocation;
this.strBucketName=strBucketName;
_logger = Logger.getLogger(UploadObject.class);
}
public void run() {
uploadToToS3();
}
public void uploadToToS3() {
_logger.info("Number of record to be uploaded in current thread: : " + records.size());
TransferManager tm = new TransferManager(s3Client);
final MultipleFileUpload upload = tm.uploadFileList(strBucketName, "", new File(fileLocation), records);
try {
upload.waitForCompletion();
} catch (AmazonServiceException e1) {
_logger.error("AmazonServiceException " + e1.getErrorMessage());
System.exit(1);
} catch (AmazonClientException e1) {
_logger.error("AmazonClientException " + e1.getMessage());
System.exit(1);
} catch (InterruptedException e1) {
_logger.error("InterruptedException " + e1.getMessage());
System.exit(1);
} finally {
_logger.info("--Calling TransferManager ShutDown--");
tm.shutdownNow(false);
}
CleanUp CleanUpObj=new CleanUp();
CleanUpObj.deleteUploadedFile(upload,records);
}
}
此类用于创建 S3 客户端管理器
public class S3ClientManager {
private static Logger _logger = Logger.getLogger(S3ClientManager.class);
public AmazonS3Client buildS3Client() {
_logger.info("Calling buildS3Client method of S3ClientManager class");
AWSCredentials credential = new ProfileCredentialsProvider("TRFAuditability-Prod-ServiceUser").getCredentials();
AmazonS3Client s3Client = (AmazonS3Client) AmazonS3ClientBuilder.standard().withRegion("us-east-1")
.withCredentials(new AWSStaticCredentialsProvider(credential)).withForceGlobalBucketAccessEnabled(true)
.build();
s3Client.getClientConfiguration().setMaxConnections(5000);
s3Client.getClientConfiguration().setConnectionTimeout(6000);
s3Client.getClientConfiguration().setSocketTimeout(30000);
_logger.info("Exiting buildS3Client method of S3ClientManager class");
return s3Client;
}
}
这是我获取文件的地方。
public class FileProcessThreads {
public File[] getFiles(String fileLocation) {
File dir = new File(fileLocation);
File[] directoryListing = dir.listFiles();
if (directoryListing.length > 0)
return directoryListing;
return null;
}
}
解决方案
很抱歉没有解决关于内存泄漏的原始问题,但你的方法对我来说似乎完全有缺陷。System.exit()
调用 at可能是导致资源泄漏的UploadObject
原因,但这仅仅是开始。Amazon S3TransferManager
已经有一个内部执行器服务,因此您不需要自己的多线程控制器。我看不出您将如何允许每个文件仅上传一次。您进行多次上传调用,然后删除所有文件,而不管上传期间是否出现故障,因此文件不在 S3 中。您尝试在执行者之间分发文件,这是不必要的。在上面添加更多线程TransferManager
ExecutorService
不会提高您的性能,只会导致颠簸。
我会采用不同的方法。
首先是一个非常简单的主类,它只是启动一个工作线程并等待它完成。
public class S3Uploader {
public static void main(String[] args) throws Exception {
final String strNoOfThreads = args[0];
final String strFileLocation = args[1];
final String strBucketName = args[2];
// Maximum number of file names that are read into memory
final int maxFileQueueSize = 5000;
S3UploadWorkerThread worker = new S3UploadWorkerThread(strFileLocation, strBucketName, Integer.parseInt(strNoOfThreads), maxFileQueueSize);
worker.run();
System.out.println("Uploading files, press any key to stop.");
System.in.read();
// Gracefully halt the worker thread waiting for any ongoing uploads to finish
worker.finish();
// Exit the main thread only after the worker thread has terminated
worker.join();
}
}
工作线程将使用 aSemaphore
来限制发送到 的上传数量TransferManager
,使用自定义文件名队列FileEnqueue
不断地从源目录读取文件,并使用 aProgressListener
来跟踪每次上传的进度。如果循环用完了要从源目录读取的文件,它会等待十秒钟并重试。甚至文件队列也可能是不必要的。仅仅列出while
工作线程循环中的文件就足够了。
import java.io.File;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
import com.amazonaws.services.s3.transfer.Upload;
public class S3UploadWorkerThread extends Thread {
private final String sourceDir;
private final String targetBucket;
private final int maxQueueSize;
private final AmazonS3Client s3Client;
private Semaphore uploadLimiter;
private boolean running;
public final long SLEEP_WHEN_NO_FILES_AVAILABLE_MS = 10000l; // 10 seconds
public S3UploadWorkerThread(final String sourceDir, final String targetBucket, final int maxConcurrentUploads, final int maxQueueSize) {
this.running = false;
this.sourceDir = sourceDir.endsWith(File.separator) ? sourceDir: sourceDir + File.separator;
this.targetBucket = targetBucket;
this.maxQueueSize = maxQueueSize;
this.s3Client = S3ClientManager.buildS3Client();
this.uploadLimiter = new Semaphore(maxConcurrentUploads);
}
public void finish() {
running = false;
}
@Override
public void run() {
running = true;
final Map<String, Upload> ongoingUploads = new ConcurrentHashMap<>();
final FileEnqueue queue = new FileEnqueue(sourceDir, maxQueueSize);
final TransferManager tm = TransferManagerBuilder.standard().withS3Client(s3Client).build();
while (running) {
// Get a file name from the in memory queue
final String fileName = queue.poll();
if (fileName!=null) {
try {
// Limit the number of concurrent uploads
uploadLimiter.acquire();
File fileObj = new File(sourceDir + fileName);
// Create an upload listener
UploadListener onComplete = new UploadListener(fileObj, queue, ongoingUploads, uploadLimiter);
try {
Upload up = tm.upload(targetBucket, fileName, fileObj);
up.addProgressListener(onComplete);
// ongoingUploads is used later to wait for ongoing uploads in case a finish() is requested
ongoingUploads.put(fileName, up);
} catch (AmazonClientException e) {
System.err.println("AmazonClientException " + e.getMessage());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
// poll() returns null when the source directory is empty then wait for a number of seconds
try {
Thread.sleep(SLEEP_WHEN_NO_FILES_AVAILABLE_MS);
} catch (InterruptedException e) {
e.printStackTrace();
}
} // fi
} // wend
// Wait for ongoing uploads to finish before exiting ending the worker thread
for (Map.Entry<String,Upload> e : ongoingUploads.entrySet()) {
try {
e.getValue().waitForCompletion();
} catch (AmazonClientException | InterruptedException x) {
System.err.println(x.getClass().getName() + " at " + e.getKey());
}
} // next
tm.shutdownNow();
}
}
UploadListener
发布允许,在上传Semaphore
完成时通知文件队列,并跟踪正在进行的上传,如果用户请求有序停止,则必须等待。使用 a ProgressListener
,您可以单独跟踪每个成功或失败的上传。
import java.io.File;
import java.util.Map;
import java.util.concurrent.Semaphore;
import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.transfer.Upload;
public class UploadListener implements ProgressListener {
private final File fileObj;
private final FileEnqueue queue;
private final Map<String, Upload> ongoingUploads;
private final Semaphore uploadLimiter;
public UploadListener(File fileObj, FileEnqueue queue, Map<String, Upload> ongoingUploads, Semaphore uploadLimiter) {
this.fileObj = fileObj;
this.queue = queue;
this.ongoingUploads = ongoingUploads;
this.uploadLimiter = uploadLimiter;
}
@Override
public void progressChanged(ProgressEvent event) {
switch(event.getEventType()) {
case TRANSFER_STARTED_EVENT :
System.out.println("Started upload of file " + fileObj.getName());
break;
case TRANSFER_COMPLETED_EVENT:
/* Upon a successful upload:
* 1. Delete the file from disk
* 2. Notify the file name queue that the file is done
* 3. Remove it from the map of ongoing uploads
* 4. Release the semaphore permit
*/
fileObj.delete();
queue.done(fileObj.getName());
ongoingUploads.remove(fileObj.getName());
uploadLimiter.release();
System.out.println("Successfully finished upload of file " + fileObj.getName());
break;
case TRANSFER_FAILED_EVENT:
queue.done(fileObj.getName());
ongoingUploads.remove(fileObj.getName());
uploadLimiter.release();
System.err.println("Failed upload of file " + fileObj.getName());
break;
default:
// do nothing
}
}
}
这是文件队列的样板样本:
import java.io.File;
import java.io.FileFilter;
import java.util.concurrent.ConcurrentSkipListSet;
public class FileEnqueue {
private final String sourceDir;
private final ConcurrentSkipListSet<FileItem> seen;
private final ConcurrentSkipListSet<String> processing;
private final int maxSeenSize;
public FileEnqueue(final String sourceDirectory, int maxQueueSize) {
sourceDir = sourceDirectory;
maxSeenSize = maxQueueSize;
seen = new ConcurrentSkipListSet<FileItem>();
processing = new ConcurrentSkipListSet<>();
}
public synchronized String poll() {
if (seen.size()==0)
enqueueFiles();
FileItem fi = seen.pollFirst();
if (fi==null) {
return null;
} else {
processing.add(fi.getName());
return fi.getName();
}
}
public void done(final String fileName) {
processing.remove(fileName);
}
private void enqueueFiles() {
final FileFilter gzFilter = new GZFileFilter();
final File dir = new File(sourceDir);
if (!dir.exists() ) {
System.err.println("Directory " + sourceDir + " not found");
} else if (!dir.isDirectory() ) {
System.err.println(sourceDir + " is not a directory");
} else {
final File [] files = dir.listFiles(gzFilter);
if (files!=null) {
// How many more file names can we read in memory
final int spaceLeft = maxSeenSize - seen.size();
// How many new files will be read into memory
final int maxNewFiles = files.length<maxSeenSize ? files.length : spaceLeft;
for (int f=0, enqueued=0; f<files.length && enqueued<maxNewFiles; f++) {
File fl = files[f];
FileItem fi = new FileItem(fl);
// Do not put into the queue any file which has been already seen or is processing
if (!seen.contains(fi) && !processing.contains(fi.getName())) {
seen.add(fi);
enqueued++;
}
} // next
}
} // fi
}
private class GZFileFilter implements FileFilter {
@Override
public boolean accept(File f) {
final String fname = f.getName().toLowerCase();
return f.isFile() && fname.endsWith(".gz") && f.length()>0L;
}
}
}
最后是你的 S3ClientManager:
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
public class S3ClientManager {
public static AmazonS3Client buildS3Client() {
AWSCredentials credential = new ProfileCredentialsProvider("TRFAuditability-Prod-ServiceUser").getCredentials();
AmazonS3Client s3Client = (AmazonS3Client) AmazonS3ClientBuilder.standard().withRegion("us-east-1")
.withCredentials(new AWSStaticCredentialsProvider(credential)).withForceGlobalBucketAccessEnabled(true)
.build();
s3Client.getClientConfiguration().setMaxConnections(5000);
s3Client.getClientConfiguration().setConnectionTimeout(6000);
s3Client.getClientConfiguration().setSocketTimeout(30000);
return s3Client;
}
}
2019 年 4 月 30 日更新添加 FileItem 类
import java.io.File;
import java.util.Comparator;
public class FileItem implements Comparable {
private final String name;
private final long dateSeen;
public FileItem(final File file) {
this.name = file.getName();
this.dateSeen = System.currentTimeMillis();
}
public String getName() {
return name;
}
public long getDateSeen() {
return dateSeen;
}
@Override
public int compareTo(Object otherObj) {
FileItem otherFileItem = (FileItem) otherObj;
if (getDateSeen()==otherFileItem.getDateSeen())
return getName().compareTo(otherFileItem.getName());
else if (getDateSeen()<otherFileItem.getDateSeen())
return -1;
else
return 1;
}
@Override
public boolean equals(Object otherFile) {
return getName().equals(((FileItem) otherFile).getName());
}
@Override
public int hashCode() {
return getName().hashCode();
}
public static final class CompareFileItems implements Comparator {
@Override
public int compare(Object fileItem1, Object fileItem2) {
return ((FileItem) fileItem1).compareTo(fileItem2);
}
}
}
推荐阅读
- kubernetes - Kubernetes PersistentVolumeClaim 失败并显示“无法从 [""/""] 获取秘密”
- python - MongoDB - 当有一个数组而不是字符串并且我不能在匹配方法中使用它时,如何过滤聚合查询中的数据?
- python - 使用 Trisurf 的 3D 绘图:添加颜色图
- python - 如何从标准输入在 python 脚本上运行 pytest
- opencv - 根据箭头标记识别汽车的零件
- r - 如何在 R 中跨 2 个不同的数据帧操作 dplyr 中的数据
- ansible - 使用 Ansible 更改参数 VM Ovirt
- python - 我可以在 python 中同时定义函数参数的默认值和数据类型吗?
- c# - C#如何将十六进制字符串转换为位数组
- c - 如何正确重用分配的指针,因为错误“双重释放或损坏(!prev)”