首页 > 解决方案 > SocketChannel.write() 在尝试写入大缓冲区时抛出 OutOfMemoryError

问题描述

运行以下行时,我的代码会引发 OutOfMemoryError:

int numBytes = socketChannel.write(_send_buffer);

java.nio.channels.SocketChannelsocketChannel的实例在哪里

并且_send_bufferjava.nio.ByteBuffer的一个实例

代码通过非阻塞选择器写入操作到达这一点,并在容量_send_buffer较大时在第一次尝试写入时抛出此错误。_send_buffer当小于 20Mb时,我对代码没有任何问题,但是当尝试使用更大的缓冲区(例如 > 100Mb)进行测试时,它会失败。

根据java.nio.channels.SocketChannel.write()的文档:

尝试将最多 r 个字节写入通道,其中 r 是在调用此方法时缓冲区中剩余的字节数,即 src.remaining()。假设写入长度为 n 的字节序列,其中 0 <= n <= r。此字节序列将从索引 p 开始的缓冲区传输,其中 p 是调用此方法时缓冲区的位置;最后写入的字节的索引将是 p + n - 1。返回时缓冲区的位置将等于 p + n;它的限制不会改变。 除非另有说明,否则写入操作仅在写入所有 r 个请求字节后才会返回。某些类型的通道,取决于它们的状态,可能只写入一些字节,或者根本不写入。例如,处于非阻塞模式的套接字通道不能写入比套接字输出缓冲区中空闲的字节多的字节

我的通道应该设置为非阻塞的,所以我认为写操作应该只尝试写到套接字输出缓冲区的容量。由于我之前没有指定这一点,我尝试通过带有SO_SNDBUF选项的setOption方法将其设置为 1024 字节。IE:

socketChannel.setOption(SO_SNDBUF, 1024);

虽然我仍然收到 OutOfMemoryError。这是完整的错误消息:

2021-04-22 11:52:44.260 11591-11733/jp.oist.abcvlib.serverLearning I/.serverLearnin: Clamp target GC heap from 195MB to 192MB
2021-04-22 11:52:44.260 11591-11733/jp.oist.abcvlib.serverLearning I/.serverLearnin: Alloc concurrent copying GC freed 2508(64KB) AllocSpace objects, 0(0B) LOS objects, 10% free, 171MB/192MB, paused 27us total 12.714ms
2021-04-22 11:52:44.261 11591-11733/jp.oist.abcvlib.serverLearning W/.serverLearnin: Throwing OutOfMemoryError "Failed to allocate a 49915610 byte allocation with 21279560 free bytes and 20MB until OOM, target footprint 201326592, growth limit 201326592" (VmSize 5585608 kB)
2021-04-22 11:52:44.261 11591-11733/jp.oist.abcvlib.serverLearning I/.serverLearnin: Starting a blocking GC Alloc
2021-04-22 11:52:44.261 11591-11733/jp.oist.abcvlib.serverLearning I/.serverLearnin: Starting a blocking GC Alloc

现在我可以在写入行进行内联调试并停止并且没有任何崩溃,所以我相信处理自身的内存需求没有问题_send_buffer,但是当尝试写入时,后台的某些东西正在创建另一个难以处理的分配。

也许我正在考虑这个错误,并且需要将我的_send_buffer大小限制为更小的东西,但我认为应该有一种方法来限制 write 命令所做的分配?或者至少以某种方式将更多的 Android 内存分配给我的应用程序。我使用的是 Pixel 3a,根据规格,它应该有 4GB 的 RAM。现在我意识到必须与系统的其余部分共享,但这是一个简单的测试设备(没有安装游戏、个人应用程序等)所以我假设我应该可以访问相当大的块那4GB。由于我以 201,326,592 的增长限制崩溃(根据上面的 logcat),我在 0.2 / 4.0 = 5% 的规范内存崩溃时似乎很奇怪。

任何关于我的方法中的基本缺陷的正确方向的提示,或避免 OutOfMemoryError 的建议将不胜感激!

编辑1:

根据评论的要求添加一些代码上下文。请注意,这不是一个可运行的示例,因为代码库非常大,并且由于公司政策,我不允许全部共享。请注意,这_send_buffer与 socketChannel 本身的 sendbuffer 无关(即getSendBufferSize引用的内容,它只是一个 ByteBuffer ,我用它在通过通道发送之前将所有内容捆绑在一起。因为我不能共享所有与生成内容相关的代码_send_buffer只是注意它是一个 ByteBuffer 比可能非常大(> 100Mb)。如果这从根本上是一个问题,那么请指出这一点以及原因。

综上所述,下面贴出NIO相关代码。请注意,这是非常原型的 alpha 代码,因此我为评论和日志语句的过载表示歉意。

SocketConnectionManager.java

(本质上是一个负责 Selector 的 Runnable)

请注意,该sendMsgToServer方法已被覆盖(无需修改)并从主 ​​Android 活动(未显示)调用。byte[] episodearg 是被包装到 inside ByteBufferSocketMessage.java下一节)中的东西,后来被放入到方法中的_send_buffer实例中。writeSocketMessage.java

package jp.oist.abcvlib.util;

import android.util.Log;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.IllegalBlockingModeException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Set;

import static java.net.StandardSocketOptions.SO_SNDBUF;

public class SocketConnectionManager implements Runnable{

    private SocketChannel sc;
    private Selector selector;
    private SocketListener socketListener;
    private final String TAG = "SocketConnectionManager";
    private SocketMessage socketMessage;
    private final String serverIp;
    private final int serverPort;

    public SocketConnectionManager(SocketListener socketListener, String serverIp, int serverPort){
        this.socketListener = socketListener;
        this.serverIp = serverIp;
        this.serverPort = serverPort;
    }

    @Override
    public void run() {
        try {
            selector = Selector.open();
            start_connection(serverIp, serverPort);
            do {
                int eventCount = selector.select(0);
                Set<SelectionKey> events = selector.selectedKeys(); // events is int representing how many keys have changed state
                if (eventCount != 0){
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    for (SelectionKey selectedKey : selectedKeys){
                        try{
                            SocketMessage socketMessage = (SocketMessage) selectedKey.attachment();
                            socketMessage.process_events(selectedKey);
                        }catch (ClassCastException e){
                            Log.e(TAG,"Error", e);
                            Log.e(TAG, "selectedKey attachment not a SocketMessage type");
                        }
                    }
                }
            } while (selector.isOpen()); //todo remember to close the selector somewhere

        } catch (IOException e) {
            Log.e(TAG,"Error", e);
        }
    }

    private void start_connection(String serverIp, int serverPort){
        try {
            InetSocketAddress inetSocketAddress = new InetSocketAddress(serverIp, serverPort);
            sc = SocketChannel.open();
            sc.configureBlocking(false);
            sc.setOption(SO_SNDBUF, 1024);
            socketMessage = new SocketMessage(socketListener, sc, selector);

            Log.v(TAG, "registering with selector to connect");
            int ops = SelectionKey.OP_CONNECT;
            sc.register(selector, ops, socketMessage);

            Log.d(TAG, "Initializing connection with " + inetSocketAddress);
            boolean connected = sc.connect(inetSocketAddress);
            Log.v(TAG, "socketChannel.isConnected ? : " + sc.isConnected());

        } catch (IOException | ClosedSelectorException | IllegalBlockingModeException
                | CancelledKeyException | IllegalArgumentException e) {
            Log.e(TAG, "Initial socket connect and registration:", e);
        }
    }

    public void sendMsgToServer(byte[] episode){
        boolean writeSuccess = socketMessage.addEpisodeToWriteBuffer(episode);
    }

    /**
     * Should be called prior to exiting app to ensure zombie threads don't remain in memory.
     */
    public void close(){
        try {
            Log.v(TAG, "Closing connection: " + sc.getRemoteAddress());
            selector.close();
            sc.close();
        } catch (IOException e) {
            Log.e(TAG,"Error", e);
        }
    }
}

SocketMessage.java

这从这里给出的示例 Python 代码中得到了很大的启发,特别是libclient.pyand app-client.py。这是因为服务器正在运行 python 代码,而客户端正在运行 Java。因此,如果您想了解事物为何如此的原因,请参考RealPython套接字教程。我基本上使用 app-server.py 作为我的代码模板,并为客户端翻译(经过修改)成 Java。

package jp.oist.abcvlib.util;

import android.util.Log;

import org.json.JSONException;
import org.json.JSONObject;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.text.DecimalFormat;
import java.util.Vector;

public class SocketMessage {
    
    private final SocketChannel sc;
    private final Selector selector;
    private final ByteBuffer _recv_buffer;
    private ByteBuffer _send_buffer;
    private int _jsonheader_len = 0;
    private JSONObject jsonHeaderRead; // Will tell Java at which points in msgContent each model lies (e.g. model1 is from 0 to 1018, model2 is from 1019 to 2034, etc.)
    private byte[] jsonHeaderBytes;
    private ByteBuffer msgContent; // Should contain ALL model files. Parse to individual files after reading
    private final Vector<ByteBuffer> writeBufferVector = new Vector<>(); // List of episodes
    private final String TAG = "SocketConnectionManager";
    private JSONObject jsonHeaderWrite;
    private boolean msgReadComplete = false;
    private SocketListener socketListener;
    private long socketWriteTimeStart;
    private long socketReadTimeStart;


    public SocketMessage(SocketListener socketListener, SocketChannel sc, Selector selector){
        this.socketListener = socketListener;
        this.sc = sc;
        this.selector = selector;
        this._recv_buffer = ByteBuffer.allocate(1024);
        this._send_buffer = ByteBuffer.allocate(1024);
    }

    public void process_events(SelectionKey selectionKey){
        SocketChannel sc = (SocketChannel) selectionKey.channel();
//        Log.i(TAG, "process_events");
        try{
            if (selectionKey.isConnectable()){
                sc.finishConnect();
                Log.d(TAG, "Finished connecting to " + ((SocketChannel) selectionKey.channel()).getRemoteAddress());
                Log.v(TAG, "socketChannel.isConnected ? : " + sc.isConnected());

            }
            if (selectionKey.isWritable()){
//                Log.i(TAG, "write event");
                write(selectionKey);
            }
            if (selectionKey.isReadable()){
//                Log.i(TAG, "read event");
                read(selectionKey);
//                int ops = SelectionKey.OP_WRITE;
//                sc.register(selectionKey.selector(), ops, selectionKey.attachment());
            }

        } catch (ClassCastException | IOException | JSONException e){
            Log.e(TAG,"Error", e);
        }
    }

    private void read(SelectionKey selectionKey) throws IOException, JSONException {

        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

        while(!msgReadComplete){
            // At this point the _recv_buffer should have been cleared (pointer 0 limit=cap, no mark)
            int bitsRead = socketChannel.read(_recv_buffer);

            if (bitsRead > 0 || _recv_buffer.position() > 0){
                if (bitsRead > 0){
//                    Log.v(TAG, "Read " + bitsRead + " bytes from " + socketChannel.getRemoteAddress());
                }

                // If you have not determined the length of the header via the 2 byte short protoheader,
                // try to determine it, though there is no gaurantee it will have enough bytes. So it may
                // pass through this if statement multiple times. Only after it has been read will
                // _jsonheader_len have a non-zero length;
                if (this._jsonheader_len == 0){
                    socketReadTimeStart = System.nanoTime();
                    process_protoheader();
                }
                // _jsonheader_len will only be larger than 0 if set properly (finished being set).
                // jsonHeaderRead will be null until the buffer gathering it has filled and converted it to
                // a JSONobject.
                else if (this.jsonHeaderRead == null){
                    process_jsonheader();
                }
                else if (!msgReadComplete){
                    process_msgContent(selectionKey);
                } else {
                    Log.e(TAG, "bitsRead but don't know what to do with them");
                }
            }
        }
    }

    private void write(SelectionKey selectionKey) throws IOException, JSONException {

        if (!writeBufferVector.isEmpty()){
            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

            Log.v(TAG, "writeBufferVector contains data");

            if (jsonHeaderWrite == null){
                int numBytesToWrite = writeBufferVector.get(0).limit();

                // Create JSONHeader containing length of episode in Bytes
                Log.v(TAG, "generating jsonheader");
                jsonHeaderWrite = generate_jsonheader(numBytesToWrite);
                byte[] jsonBytes = jsonHeaderWrite.toString().getBytes(StandardCharsets.UTF_8);

                // Encode length of JSONHeader to first two bytes and write to socketChannel
                int jsonLength = jsonBytes.length;

                // Add up length of protoHeader, JSONheader and episode bytes
                int totalNumBytesToWrite = Integer.BYTES + jsonLength + numBytesToWrite;

                // Create new buffer that compiles protoHeader, JsonHeader, and Episode
                _send_buffer = ByteBuffer.allocate(totalNumBytesToWrite);

                Log.v(TAG, "Assembling _send_buffer");
                // Assemble all bytes and flip to prepare to read
                _send_buffer.putInt(jsonLength);
                _send_buffer.put(jsonBytes);
                _send_buffer.put(writeBufferVector.get(0));
                _send_buffer.flip();

                Log.d(TAG, "Writing to server ...");

                // Write Bytes to socketChannel //todo shouldn't be while as should be non-blocking
                if (_send_buffer.remaining() > 0){
                    int numBytes = socketChannel.write(_send_buffer); // todo memory dump error here!
                    int percentDone = (int) Math.ceil((((double) _send_buffer.limit() - (double) _send_buffer.remaining())
                            / (double) _send_buffer.limit()) * 100);
                    int total = _send_buffer.limit() / 1000000;
//                    Log.d(TAG, "Sent " + percentDone + "% of " + total + "Mb to " + socketChannel.getRemoteAddress());
                }
            } else{
                // Write Bytes to socketChannel
                if (_send_buffer.remaining() > 0){
                    socketChannel.write(_send_buffer);
                }
            }
            if (_send_buffer.remaining() == 0){
                int total = _send_buffer.limit() / 1000000;
                double timeTaken = (System.nanoTime() - socketWriteTimeStart) * 10e-10;
                DecimalFormat df = new DecimalFormat();
                df.setMaximumFractionDigits(2);
                Log.i(TAG, "Sent " + total + "Mb in " + df.format(timeTaken) + "s");
                // Remove episode from buffer so as to not write it again.
                writeBufferVector.remove(0);
                // Clear sending buffer
                _send_buffer.clear();
                // make null so as to catch the initial if statement to write a new one.
                jsonHeaderWrite = null;

                // Set socket to read now that writing has finished.
                Log.d(TAG, "Reading from server ...");
                int ops = SelectionKey.OP_READ;
                sc.register(selectionKey.selector(), ops, selectionKey.attachment());
            }

        }
    }

    private JSONObject generate_jsonheader(int numBytesToWrite) throws JSONException {
        JSONObject jsonHeader = new JSONObject();

        jsonHeader.put("byteorder", ByteOrder.nativeOrder().toString());
        jsonHeader.put("content-length", numBytesToWrite);
        jsonHeader.put("content-type", "flatbuffer"); // todo Change to flatbuffer later
        jsonHeader.put("content-encoding", "flatbuffer"); //Change to flatbuffer later
        return jsonHeader;
    }

    /**
     * recv_buffer may contain 0, 1, or several bytes. If it has more than hdrlen, then process
     * the first two bytes to obtain the length of the jsonheader. Else exit this function and
     * read from the buffer again until it fills past length hdrlen.
     */
    private void process_protoheader() {
        Log.v(TAG, "processing protoheader");
        int hdrlen = 2;
        if (_recv_buffer.position() >= hdrlen){
            _recv_buffer.flip(); //pos at 0 and limit set to bitsRead
            _jsonheader_len = _recv_buffer.getShort(); // Read 2 bytes converts to short and move pos to 2
            // allocate new ByteBuffer to store full jsonheader
            jsonHeaderBytes = new byte[_jsonheader_len];

            _recv_buffer.compact();

            Log.v(TAG, "finished processing protoheader");
        }
    }

    /**
     *  As with the process_protoheader we will check if _recv_buffer contains enough bytes to
     *  generate the jsonHeader objects, and if not, leave it alone and read more from socket.
     */
    private void process_jsonheader() throws JSONException {

        Log.v(TAG, "processing jsonheader");

        // If you have enough bytes in the _recv_buffer to write out the jsonHeader
        if (_jsonheader_len - _recv_buffer.position() < 0){
            _recv_buffer.flip();
            _recv_buffer.get(jsonHeaderBytes);
            // jsonheaderBuffer should now be full and ready to convert to a JSONobject
            jsonHeaderRead = new JSONObject(new String(jsonHeaderBytes));
            Log.d(TAG, "JSONheader from server: " + jsonHeaderRead.toString());

            try{
                int msgLength = (int) jsonHeaderRead.get("content-length");
                msgContent = ByteBuffer.allocate(msgLength);
            }catch (JSONException e) {
                Log.e(TAG, "Couldn't get content-length from jsonHeader sent from server", e);
            }
        }
        // Else return to selector and read more bytes into the _recv_buffer

        // If there are any bytes left over (part of the msg) then move them to the front of the buffer
        // to prepare for another read from the socket
        _recv_buffer.compact();
    }

    /**
     * Here a bit different as it may take multiple full _recv_buffers to fill the msgContent.
     * So check if msgContent.remaining is larger than 0 and if so, dump everything from _recv_buffer to it
     * @param selectionKey : Used to reference the instance and selector
     * @throws ClosedChannelException :
     */
    private void process_msgContent(SelectionKey selectionKey) throws IOException {

        if (msgContent.remaining() > 0){
            _recv_buffer.flip(); //pos at 0 and limit set to bitsRead set ready to read
            msgContent.put(_recv_buffer);
            _recv_buffer.clear();
        }

        if (msgContent.remaining() == 0){
            // msgContent should now be full and ready to convert to a various model files.
            socketListener.onServerReadSuccess(jsonHeaderRead, msgContent);

            // Clear for next round of communication
            _recv_buffer.clear();
            _jsonheader_len = 0;
            jsonHeaderRead = null;
            msgContent.clear();

            int totalBytes = msgContent.capacity() / 1000000;
            double timeTaken = (System.nanoTime() - socketReadTimeStart) * 10e-10;
            DecimalFormat df = new DecimalFormat();
            df.setMaximumFractionDigits(2);
            Log.i(TAG, "Entire message containing " + totalBytes + "Mb recv'd in " + df.format(timeTaken) + "s");

            msgReadComplete = true;

            // Set socket to write now that reading has finished.
            int ops = SelectionKey.OP_WRITE;
            sc.register(selectionKey.selector(), ops, selectionKey.attachment());
        }
    }

    //todo should send this to the mainactivity listener so it can be customized/overridden
    private void onNewMessageFromServer(){
        // Take info from JSONheader to parse msgContent into individual model files

        // After parsing all models notify MainActivity that models have been updated
    }

    // todo should be able deal with ByteBuffer from FlatBuffer rather than byte[]
    public boolean addEpisodeToWriteBuffer(byte[] episode){
        boolean success = false;
        try{
            ByteBuffer bb = ByteBuffer.wrap(episode);
            success = writeBufferVector.add(bb);
            Log.v(TAG, "Added data to writeBuffer");
            int ops = SelectionKey.OP_WRITE;
            socketWriteTimeStart = System.nanoTime();
            sc.register(selector, ops, this);
            // I want this to trigger the selector that this channel is writeReady.
        } catch (NullPointerException | ClosedChannelException e){
            Log.e(TAG,"Error", e);
            Log.e(TAG, "SocketConnectionManager.data not initialized yet");
        }
        return success;
    }
}

标签: javaandroidnioandroid-memorysocketchannel

解决方案


在 Android Docs 中偶然发现了这一点,它回答了我为什么会出现 OutOfMemoryError 的问题。

为了维护一个功能性的多任务环境,Android 对每个应用程序的堆大小设置了硬性限制。确切的堆大小限制因设备而异,具体取决于设备总体可用的 RAM 量。如果您的应用程序已达到堆容量并尝试分配更多内存,它可能会收到 OutOfMemoryError。

在某些情况下,您可能希望查询系统以确定您在当前设备上有多少可用的堆空间——例如,确定有多少数据可以安全地保存在缓存中。您可以通过调用 getMemoryClass() 在系统中查询此图。此方法返回一个整数,指示您的应用程序堆可用的兆字节数。

运行ActivityManager.getMemoryClass方法后,我看到 Pixel 3a 的硬限制为 192 MB。当我试图分配刚刚超过 200 MB 的空间时,我达到了这个限制。

我还检查了ActivityManager.getLargeMemoryClass,发现我的硬限制为 512 MB。所以我可以将我的应用程序设置为“largeHeap”,但尽管有 4GB 的 RAM,但我需要解决 512 MB 的硬限制。

除非其他人知道解决这个问题的任何方法,否则我将不得不编写一些逻辑来分段写入episode文件,如果它超过某个点,然后分段发送它到通道上。我想这会使事情变慢一点,所以如果有人有一个可以避免这种情况的答案,或者告诉我为什么如果做得好这不会减慢速度,那么我很乐意给你答案。只是将其发布为答案,因为它确实回答了我原来的问题,但并不令人满意。


推荐阅读