首页 > 解决方案 > 从 Amazon S3 下载大型 csv 文件时出现套接字异常

问题描述

我有一个程序可以逐行读取大型 (~600MB) csv 文件 Amazon S3。当我启动应用程序时,它可以正常运行大约 40-45 分钟,之后我得到了 Socket 异常。尝试了多种方法,但结果几乎相同。我创建了一个 poc 应用程序来复制场景,如下所示。

public class readS3File {



    public static void main(String[] args) throws IOException {
        Regions clientRegion = Regions.US_WEST_2;
        String bucketName = "/test-bucket";
        //final AmazonS3 s3Client;
        String key = "Unsaved/test.csv";
        String accessKey = "accessKey";
        String secretKey = "secretKey";
        AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
        S3Object fullObject = null, objectPortion = null, headerOverrideObject = null;
        try {

            ClientConfiguration config = new ClientConfiguration();
            config.setSocketTimeout(0);
            config.withTcpKeepAlive(true);
            AmazonS3 s3Client = AmazonS3ClientBuilder.standard()
                    .withRegion(clientRegion)
                    .withCredentials(new AWSStaticCredentialsProvider(credentials)).withClientConfiguration(config)
                    .build();

            ResponseHeaderOverrides headerOverrides = new ResponseHeaderOverrides()
                    .withCacheControl("No-cache");
            GetObjectRequest getObjectRequestHeaderOverride = new GetObjectRequest(bucketName, key)
                    .withResponseHeaders(headerOverrides);
            headerOverrideObject = s3Client.getObject(getObjectRequestHeaderOverride);
            displayTextInputStream(headerOverrideObject.getObjectContent());


        } catch (AmazonServiceException e) {
            // The call was transmitted successfully, but Amazon S3 couldn't process
            // it, so it returned an error response.
            e.printStackTrace();
        } catch (SdkClientException e) {
            // Amazon S3 couldn't be contacted for a response, or the client
            // couldn't parse the response from Amazon S3.
            e.printStackTrace();
        }
        finally {
            // To ensure that the network connection doesn't remain open, close any open input streams.
            if (fullObject != null) {
                fullObject.close();
            }
            if (objectPortion != null) {
                objectPortion.close();
            }
            if (headerOverrideObject != null) {
                headerOverrideObject.close();
            }
        }


    }
    private static void displayTextInputStream(InputStream input) throws IOException {
        // Read the text input stream one line at a time and display each line.
        BufferedReader reader = new BufferedReader(new InputStreamReader(input));

        try {
            String line = null;
            long startTime = System.nanoTime();
            int count=0;
            while ((line = reader.readLine()) != null) {
                count=count+1;
                long stopTime = System.nanoTime();
                long elapsedTime = stopTime - startTime;
                double elapsedTimeInSecond = (double) elapsedTime / 1_000_000_000;
                System.out.println(line);
                System.out.println("Processing running for : " + elapsedTimeInSecond + "seconds , at line number ===== " + count);
                // This sleep is to consider the time taken by backend job to handle the content read.
                Thread.sleep(5000);
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }


    }

}

Exception message is as below

javax.net.ssl.SSLException: Connection reset
    at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:127)
    at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:327)
    at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:270)
    at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:265)
    at java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:144)
    at java.base/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1199)
    at java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1166)
    at java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:832)
    at org.apache.http.impl.conn.LoggingInputStream.read(LoggingInputStream.java:84)
    at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137)
    at org.apache.http.impl.io.SessionInputBufferImpl.read(SessionInputBufferImpl.java:197)
    at org.apache.http.impl.io.ContentLengthInputStream.read(ContentLengthInputStream.java:176)
    at org.apache.http.conn.EofSensorInputStream.read(EofSensorInputStream.java:135)
    at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
    at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
    at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
    at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
    at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
    at com.amazonaws.event.ProgressInputStream.read(ProgressInputStream.java:180)
    at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
    at com.amazonaws.util.LengthCheckInputStream.read(LengthCheckInputStream.java:107)
    at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
    at com.amazonaws.services.s3.internal.S3AbortableInputStream.read(S3AbortableInputStream.java:125)
    at com.amazonaws.internal.SdkFilterInputStream.read(SdkFilterInputStream.java:90)
    at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    at java.base/java.io.InputStreamReader.read(InputStreamReader.java:185)
    at java.base/java.io.BufferedReader.fill(BufferedReader.java:161)
    at java.base/java.io.BufferedReader.readLine(BufferedReader.java:326)
    at java.base/java.io.BufferedReader.readLine(BufferedReader.java:392)
    at readS3File.displayTextInputStream(readS3File.java:92)
    at readS3File.main(readS3File.java:57)
    Suppressed: java.net.SocketException: Broken pipe (Write failed)
        at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
        at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
        at java.base/sun.security.ssl.SSLSocketOutputRecord.encodeAlert(SSLSocketOutputRecord.java:81)
        at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:358)
        ... 31 more
Caused by: java.net.SocketException: Connection reset
    at java.base/java.net.SocketInputStream.read(SocketInputStream.java:186)
    at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
    at java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:467)
    at java.base/sun.security.ssl.SSLSocketInputRecord.readFully(SSLSocketInputRecord.java:450)
    at java.base/sun.security.ssl.SSLSocketInputRecord.decodeInputRecord(SSLSocketInputRecord.java:243)
    at java.base/sun.security.ssl.SSLSocketInputRecord.decode(SSLSocketInputRecord.java:181)
    at java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:110)
    ... 28 more

Process finished with exit code 0

克服这个问题的最佳解决方案是什么?

标签: javaspring-bootamazon-s3

解决方案


尽管您在代码中设置了 socketTimeout,但根据文档,还有其他几个可能与此情况相关的超时。您可以尝试设置withRequestTimeoutwithClientExecutionTimeout

但是,您也可以尝试使用 BlockingQueue 来并行读取和写入。检查以下测试以获取非常简单的用例;

@Test
public void t11() throws InterruptedException {

    final BlockingQueue<String> taskQueue = new LinkedBlockingQueue<>();
    final AtomicBoolean isFinished = new AtomicBoolean(false);

    Thread t1 = new Thread(()->{
        for (int i=1; i<1000;i++){
            taskQueue.add("Item " + i);
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                //handle interrupt
            }
        }
        isFinished.set(true);
    });

    long tick = System.currentTimeMillis();
    Thread t2 = new Thread(()->{
        while(!isFinished.get() || taskQueue.size()>0) {
            try {
                System.out.println("After " + ( System.currentTimeMillis()-tick) + " ms. - " + taskQueue.take());
                Thread.sleep((long)(Math.random()*20));
            } catch (InterruptedException e) {
                //handle interrupt
            }

        }
    });

    t1.start();
    t2.start();
    t1.join();
    t2.join();
    System.out.println("Done...");
}

但是这里有一个问题;如果您的服务器上的内存有限,您可能需要使用 ArrayBlockingQueue 而不是 LinkedBlockingQueue 来防止可能的内存问题...


推荐阅读