首页 > 解决方案 > 如何使用循环缓冲区批处理数据

问题描述

我已经创建了一个循环缓冲区,并希望创建一批 1000 条记录。一旦我的缓冲区是 1000 条记录,我就会将数据写入文件并清除缓冲区。每个批次都将是一个新文件。我尝试使用 reset() 方法在容量已满后重置缓冲区。我不知道如何将数据写入新文件。请帮助

import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.List;

public class DataBuffer<T> {

    public T[] elements = null;

    public int capacity = 0;
    public int writePos = 0;
    public int readPos = 0;

    public boolean flipMarker = false;

    public DataBuffer(Class<T> type, int capacity) {
        this.capacity = capacity;
        @SuppressWarnings("unchecked")
        final T[] elements = (T[]) Array.newInstance(type, capacity);
        this.elements = elements;
    }

    public T get(int i) {
        return elements[i];
    }

    public void reset() {
        this.writePos = 0;
        this.readPos = 0;
        this.flipMarker = false;
    }

    public int available() {
        if (!flipMarker) {
            return capacity - writePos;
        }
        return readPos - writePos;
    }

    public int remainingCapacity() {
        if (!flipMarker) {
            return capacity - writePos;
        }
        return readPos - writePos;
    }

    public boolean put(T element) {
        if (!flipMarker) {
            if (writePos == capacity) {
                writePos = 0;
                flipMarker = true;

                if (writePos < readPos) {
                    elements[writePos++] = element;
                    return true;
                } else {
                    return false;
                }
            } else {
                elements[writePos++] = element;
                return true;
            }
        } else {
            if (writePos < readPos) {
                elements[writePos++] = element;
                return true;
            } else {
                return false;
            }
        }
    }

    public T take() {
        if (!flipMarker) {
            if (readPos < writePos) {
                return elements[readPos++];
            } else {
                return null;
            }
        } else {
            if (readPos == capacity) {
                readPos = 0;
                flipMarker = false;

                if (readPos < writePos) {
                    return elements[readPos++];
                } else {
                    return null;
                }
            } else {
                return elements[readPos++];
            }
        }
    }

    public List<T> getAll(){
        return (List<T>) Arrays.asList(elements);
    }
}

试图将数据溢出到磁盘的代码如下

private DataBuffer<Employee> processedSyslogDataBuffer = new DataBuffer<>(Employee.class, 1000);
 processedSyslogDataBuffer.put(processedMessage);
if(processedSyslogDataBuffer.remainingCapacity() == 0) {
    List<Employee> data = processedSyslogDataBuffer.getAll();
    logger.info("Writing processed data with size :: "+data.size());
    writerService.writeProcessed(data);
   logger.info("resetting the buffer to accomodate next batch of records");
    processedSyslogDataBuffer.reset();
}

标签: javacircular-buffer

解决方案


推荐阅读