java - 如何使用循环缓冲区批处理数据
问题描述
我已经创建了一个循环缓冲区,并希望创建一批 1000 条记录。一旦我的缓冲区是 1000 条记录,我就会将数据写入文件并清除缓冲区。每个批次都将是一个新文件。我尝试使用 reset() 方法在容量已满后重置缓冲区。我不知道如何将数据写入新文件。请帮助
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.List;
public class DataBuffer<T> {
public T[] elements = null;
public int capacity = 0;
public int writePos = 0;
public int readPos = 0;
public boolean flipMarker = false;
public DataBuffer(Class<T> type, int capacity) {
this.capacity = capacity;
@SuppressWarnings("unchecked")
final T[] elements = (T[]) Array.newInstance(type, capacity);
this.elements = elements;
}
public T get(int i) {
return elements[i];
}
public void reset() {
this.writePos = 0;
this.readPos = 0;
this.flipMarker = false;
}
public int available() {
if (!flipMarker) {
return capacity - writePos;
}
return readPos - writePos;
}
public int remainingCapacity() {
if (!flipMarker) {
return capacity - writePos;
}
return readPos - writePos;
}
public boolean put(T element) {
if (!flipMarker) {
if (writePos == capacity) {
writePos = 0;
flipMarker = true;
if (writePos < readPos) {
elements[writePos++] = element;
return true;
} else {
return false;
}
} else {
elements[writePos++] = element;
return true;
}
} else {
if (writePos < readPos) {
elements[writePos++] = element;
return true;
} else {
return false;
}
}
}
public T take() {
if (!flipMarker) {
if (readPos < writePos) {
return elements[readPos++];
} else {
return null;
}
} else {
if (readPos == capacity) {
readPos = 0;
flipMarker = false;
if (readPos < writePos) {
return elements[readPos++];
} else {
return null;
}
} else {
return elements[readPos++];
}
}
}
public List<T> getAll(){
return (List<T>) Arrays.asList(elements);
}
}
试图将数据溢出到磁盘的代码如下
private DataBuffer<Employee> processedSyslogDataBuffer = new DataBuffer<>(Employee.class, 1000);
processedSyslogDataBuffer.put(processedMessage);
if(processedSyslogDataBuffer.remainingCapacity() == 0) {
List<Employee> data = processedSyslogDataBuffer.getAll();
logger.info("Writing processed data with size :: "+data.size());
writerService.writeProcessed(data);
logger.info("resetting the buffer to accomodate next batch of records");
processedSyslogDataBuffer.reset();
}
解决方案
推荐阅读
- json - 使用 Fetch 有时会出现类型错误,因为照片的 href 未定义
- c++ - C ++中矩形的碰撞解决方案
- visual-studio-code - 如何在 VS Code 中同时使用 Victor Mono 的斜体和斜体样式?
- javafx - 在 TableView JavaFx 中显示浮点数的两位小数
- javascript - JQuery 从数组中删除特定元素
- vue.js - 传递值以存储 getter Vue.js
- java - for 循环阻止我从数据库中检索数据?
- numpy - Numpy 中的 Sigmoid 函数
- azure - 在 Azure 中启动 VM,没有在 azure 自动化中使用 VM 启动 Runbook 的 runas 权限
- java - 无法使用 @RestController 获取 XML 响应