java - 从 Amazon S3 下载大型 csv 文件时出现套接字异常
问题描述
我有一个程序可以逐行读取大型 (~600MB) csv 文件 Amazon S3。当我启动应用程序时,它可以正常运行大约 40-45 分钟,之后我得到了 Socket 异常。尝试了多种方法,但结果几乎相同。我创建了一个 poc 应用程序来复制场景,如下所示。
public class readS3File {
public static void main(String[] args) throws IOException {
Regions clientRegion = Regions.US_WEST_2;
String bucketName = "/test-bucket";
//final AmazonS3 s3Client;
String key = "Unsaved/test.csv";
String accessKey = "accessKey";
String secretKey = "secretKey";
AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
S3Object fullObject = null, objectPortion = null, headerOverrideObject = null;
try {
ClientConfiguration config = new ClientConfiguration();
config.setSocketTimeout(0);
config.withTcpKeepAlive(true);
AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
.withRegion(clientRegion)
.withCredentials(new AWSStaticCredentialsProvider(credentials)).withClientConfiguration(config)
.build();
ResponseHeaderOverrides headerOverrides = new ResponseHeaderOverrides()
.withCacheControl("No-cache");
GetObjectRequest getObjectRequestHeaderOverride = new GetObjectRequest(bucketName, key)
.withResponseHeaders(headerOverrides);
headerOverrideObject = s3Client.getObject(getObjectRequestHeaderOverride);
displayTextInputStream(headerOverrideObject.getObjectContent());
} catch (AmazonServiceException e) {
// The call was transmitted successfully, but Amazon S3 couldn't process
// it, so it returned an error response.
e.printStackTrace();
} catch (SdkClientException e) {
// Amazon S3 couldn't be contacted for a response, or the client
// couldn't parse the response from Amazon S3.
e.printStackTrace();
}
finally {
// To ensure that the network connection doesn't remain open, close any open input streams.
if (fullObject != null) {
fullObject.close();
}
if (objectPortion != null) {
objectPortion.close();
}
if (headerOverrideObject != null) {
headerOverrideObject.close();
}
}
}
private static void displayTextInputStream(InputStream input) throws IOException {
// Read the text input stream one line at a time and display each line.
BufferedReader reader = new BufferedReader(new InputStreamReader(input));
try {
String line = null;
long startTime = System.nanoTime();
int count=0;
while ((line = reader.readLine()) != null) {
count=count+1;
long stopTime = System.nanoTime();
long elapsedTime = stopTime - startTime;
double elapsedTimeInSecond = (double) elapsedTime / 1_000_000_000;
System.out.println(line);
System.out.println("Processing running for : " + elapsedTimeInSecond + "seconds , at line number ===== " + count);
// This sleep is to consider the time taken by backend job to handle the content read.
Thread.sleep(5000);
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
Exception message is as below
javax.net.ssl.SSLException: Connection reset
at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:127)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:327)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:270)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:265)
at java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:144)
at java.base/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1199)
at java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1166)
at java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:832)
at org.apache.http.impl.conn.LoggingInputStream.read(LoggingInputStream.java:84)
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
at org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:197)
at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176)
at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135)
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
at com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:107)
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
at com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:125)
at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
at readS3File.displayTextInputStream(readS3File.java:92)
at readS3File.main(readS3File.java:57)
Suppressed: java.net.SocketException: Broken pipe (Write failed)
at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
at java.base/sun.security.ssl.SSLSocketOutputRecord.encodeAlert(SSLSocketOutputRecord.java:81)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:358)
... 31 more
Caused by: java.net.SocketException: Connection reset
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
at java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:467)
at java.base/sun.security.ssl.SSLSocketInputRecord.readFully(SSLSocketInputRecord.java:450)
at java.base/sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:243)
at java.base/sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:181)
at java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
... 28 more
Process finished with exit code 0
克服这个问题的最佳解决方案是什么?
解决方案
尽管您在代码中设置了 socketTimeout,但根据文档,还有其他几个可能与此情况相关的超时。您可以尝试设置withRequestTimeout或withClientExecutionTimeout
但是,您也可以尝试使用 BlockingQueue 来并行读取和写入。检查以下测试以获取非常简单的用例;
@Test
public void t11() throws InterruptedException {
final BlockingQueue<String> taskQueue = new LinkedBlockingQueue<>();
final AtomicBoolean isFinished = new AtomicBoolean(false);
Thread t1 = new Thread(()->{
for (int i=1; i<1000;i++){
taskQueue.add("Item " + i);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
//handle interrupt
}
}
isFinished.set(true);
});
long tick = System.currentTimeMillis();
Thread t2 = new Thread(()->{
while(!isFinished.get() || taskQueue.size()>0) {
try {
System.out.println("After " + ( System.currentTimeMillis()-tick) + " ms. - " + taskQueue.take());
Thread.sleep((long)(Math.random()*20));
} catch (InterruptedException e) {
//handle interrupt
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Done...");
}
但是这里有一个问题;如果您的服务器上的内存有限,您可能需要使用 ArrayBlockingQueue 而不是 LinkedBlockingQueue 来防止可能的内存问题...
推荐阅读
- javascript - 将 C# 变量传递给 HTML 按钮
- r - 按数字列出索引和一些元素NULL,如何转换为数据框?
- javascript - 如何在 Javascript(Postman) 中从 JSON 对象中获取键和值
- ios - Apple PDFKit - 带有验证脚本的 PDF 问题。
- javascript - 复制每个组件安装的对象。我怎样才能让它只运行一次?在反应
- django - Django post save 信号每次请求调用两次
- git - 更改文件名的大小写后,git 抱怨结帐时可能会丢失数据
- mysql - #1064 - SQL 连接 2 表问题
- nginx - AWS Nginx ALB 端口配置
- sql - 加入两个表 - 多个条件。如果未加入先前条件,则使用后续条件