java - 在 Java 中以同样快的速度同时处理 Json 数组
问题描述
我需要找到一种解决方案来处理包含 100 万个元素的 Json 数组并尽可能快地写入输出。我选择线程来同时处理数据。但最棘手的部分是我需要按照收到的顺序将数据写入输出。让我用例子来解释我的问题。
假设我有 Json 数组作为输入,它有 10 个元素。我需要先检查每个整数是偶数还是奇数,然后如果是偶数,则为每个整数生成 2 行,如果为奇数,则为每个整数生成 3 行。该行是格式
sequenceNumber_Integer
而序列号每行递增。下面是 4 个元素的 Json 数组的示例,它产生 10 行输出。我正在使用
格森
解析和迭代 json 数组
[ 1, 2, 3, 4 ]
我对并发编程很陌生,但是我尝试了自己并设法使它产生了结果。下面是我的示例代码。
import com.google.gson.stream.JsonReader;
import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class SampleCheck {
public static void main(String[] args) throws IOException, InterruptedException {
String jsonStr = "[ 1, 2, 3, 4 ]";
JsonReader jsonReader = new JsonReader(new StringReader(jsonStr));
processJsonArray(jsonReader);
}
private static void processJsonArray(JsonReader jsonReader) throws InterruptedException, IOException {
String newLine = System.getProperty("line.separator");
AtomicInteger writeIndex = new AtomicInteger(0);
AtomicBoolean stop = new AtomicBoolean(false);
ExecutorService executorService = Executors.newFixedThreadPool(4);
ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<>(100);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(byteArrayOutputStream);
for (int i = 0; i < 4; i++) {
executorService.submit(() -> {
StringBuilder sb = new StringBuilder(5);
while (!(stop.get() && queue.isEmpty())) {
Data data = queue.poll();
if (data == null) {
continue;
}
try {
int seq = data.getSeq();
String result = newLine;
if (data.getData() % 2 == 0) { //Even
result += seq++ + "_" + data.getData();
result += newLine;
result += seq + "_" + data.getData();
} else { //odd
result += seq++ + "_" + data.getData();
result += newLine;
result += seq++ + "_" + data.getData();
result += newLine;
result += seq + "_" + data.getData();
}
while (data.getIndex() > writeIndex.get()) {
//Do nothing and wait for other threads to complete
}
out.writeBytes(result);
writeIndex.incrementAndGet();
} catch (Exception ignore) {
}
}
});
}
int seq = 1;
int index = 0;
jsonReader.beginArray();
while (true) {
if(jsonReader.hasNext()) {
int data = jsonReader.nextInt();
queue.add(new Data(data, index, seq));
index++;
seq += (data % 2) == 0 ? 2 : 3;
} else {
break;
}
}
stop.set(true);
executorService.shutdown();
executorService.awaitTermination(20, TimeUnit.MINUTES);
out.close();
System.out.println(new String(byteArrayOutputStream.toByteArray()));
}
private static class Data {
private int data;
private int index;
private int seq;
public Data(int data, int index, int seq) {
this.data = data;
this.index = index;
this.seq = seq;
}
public int getData() {
return data;
}
public int getIndex() {
return index;
}
public int getSeq() {
return seq;
}
}
}
但是我需要专家的建议以不同的方式解决这个问题并获得最大的性能。我的代码看起来非常冗长,如果我可以适应任何或任何更改以获得最佳性能,我需要比这更好的解决方案。你们能帮帮我还是这段代码看起来不错?
PS:上面的例子是为了说明我的问题。在现实世界中,我在 zip 流中获取数据(最多 100 万)并将行写入 zip 输出流
编辑:添加了更真实的例子。处理 Json 数组而不是 List。我需要processJsonArray
方法方面的帮助。在现实世界中,json reader 需要处理 100 万个元素
解决方案
这似乎是并行流的一个非常好的用例。Java 将完成拆分成单独线程并按顺序重新组装的所有艰苦工作,您根本不需要处理并发或线程。
您的代码可能很简单:
inputList.parallelStream()
.flatMap(in -> createOutputLines(in))
.forEach(out -> output(out));
话虽如此,如果您的 IO 以外的任何东西对性能产生重大影响,我会感到非常惊讶。您需要对输入进行非常复杂的处理才能使其不仅仅是舍入误差。
推荐阅读
- html - 如何对齐输入 datetime-local 的下拉列表
- wordpress - Wordpress - 更改安装在 /blog 子目录的站点的永久链接结构时出错
- php - PHP和HTML检查用户名是否在数据库中
- android - 如何正确设置多行文本视图的动画(展开和折叠)
- android - 如果没有消息,MessageQueue 怎么知道阻塞多长时间?
- scala - 在 scala List :: 中不进行隐式转换
- java - 使用 Map 与 Enum
- javascript - 我们可以使用 html 标签和 css 来为 openlayers 中的功能设置样式吗?
- angular - 从另一个页面推送时,Angular Nativescript RadListView 不更新
- c# - 我可以结合重试和后备 Polly 弹性策略吗?