首页 > 解决方案 > 在 Java 中以同样快的速度同时处理 Json 数组

问题描述

我需要找到一种解决方案来处理包含 100 万个元素的 Json 数组并尽可能快地写入输出。我选择线程来同时处理数据。但最棘手的部分是我需要按照收到的顺序将数据写入输出。让我用例子来解释我的问题。

假设我有 Json 数组作为输入,它有 10 个元素。我需要先检查每个整数是偶数还是奇数,然后如果是偶数,则为每个整数生成 2 行,如果为奇数,则为每个整数生成 3 行。该行是格式

sequenceNumber_Integer

而序列号每行递增。下面是 4 个元素的 Json 数组的示例,它产生 10 行输出。我正在使用

格森

解析和迭代 json 数组

[ 1, 2, 3, 4 ]

在此处输入图像描述

我对并发编程很陌生,但是我尝试了自己并设法使它产生了结果。下面是我的示例代码。

    import com.google.gson.stream.JsonReader;

import java.io.BufferedOutputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class SampleCheck {
    public static void main(String[] args) throws IOException, InterruptedException {
        String jsonStr = "[ 1, 2, 3, 4 ]";
        JsonReader jsonReader = new JsonReader(new StringReader(jsonStr));
        processJsonArray(jsonReader);
    }

    private static  void processJsonArray(JsonReader jsonReader) throws InterruptedException, IOException {
        String newLine = System.getProperty("line.separator");
        AtomicInteger writeIndex = new AtomicInteger(0);
        AtomicBoolean stop = new AtomicBoolean(false);
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        ArrayBlockingQueue<Data> queue = new ArrayBlockingQueue<>(100);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        DataOutputStream out = new DataOutputStream(byteArrayOutputStream);
        for (int i = 0; i < 4; i++) {
            executorService.submit(() -> {
                StringBuilder sb = new StringBuilder(5);
                while (!(stop.get() && queue.isEmpty())) {
                    Data data = queue.poll();
                    if (data == null) {
                        continue;
                    }
                    try {
                        int seq = data.getSeq();
                        String result = newLine;
                        if (data.getData() % 2 == 0) { //Even
                            result += seq++ + "_" + data.getData();
                            result += newLine;
                            result += seq + "_" + data.getData();

                        } else { //odd
                            result += seq++ + "_" + data.getData();
                            result += newLine;
                            result += seq++ + "_" + data.getData();
                            result += newLine;
                            result += seq + "_" + data.getData();
                        }
                        while (data.getIndex() > writeIndex.get()) {
                            //Do nothing and wait for other threads to complete
                        }
                        out.writeBytes(result);
                        writeIndex.incrementAndGet();

                    } catch (Exception ignore) {

                    }
                }
            });
        }
            int seq = 1;
            int index = 0;
            jsonReader.beginArray();
             while (true) {
            if(jsonReader.hasNext()) {
                int data = jsonReader.nextInt();
                queue.add(new Data(data, index, seq));
                index++;
                seq += (data % 2) == 0 ? 2 : 3;
            } else {
                break;
            }

        }
            stop.set(true);
            executorService.shutdown();
            executorService.awaitTermination(20, TimeUnit.MINUTES);
            out.close();
            System.out.println(new String(byteArrayOutputStream.toByteArray()));
        }

    private static class Data {
        private int data;
        private int index;
        private int seq;

        public Data(int data, int index, int seq) {
            this.data = data;
            this.index = index;
            this.seq = seq;
        }

        public int getData() {
            return data;
        }

        public int getIndex() {
            return index;
        }

        public int getSeq() {
            return seq;
        }
    }
}

但是我需要专家的建议以不同的方式解决这个问题并获得最大的性能。我的代码看起来非常冗长,如果我可以适应任何或任何更改以获得最佳性能,我需要比这更好的解决方案。你们能帮帮我还是这段代码看起来不错?

PS:上面的例子是为了说明我的问题。在现实世界中,我在 zip 流中获取数据(最多 100 万)并将行写入 zip 输出流

编辑:添加了更真实的例子。处理 Json 数组而不是 List。我需要processJsonArray方法方面的帮助。在现实世界中,json reader 需要处理 100 万个元素

标签: javajsonmultithreadinggson

解决方案


这似乎是并行流的一个非常好的用例。Java 将完成拆分成单独线程并按顺序重新组装的所有艰苦工作,您根本不需要处理并发或线程。

您的代码可能很简单:

inputList.parallelStream()
    .flatMap(in -> createOutputLines(in))
    .forEach(out -> output(out));

话虽如此,如果您的 IO 以外的任何东西对性能产生重大影响,我会感到非常惊讶。您需要对输入进行非常复杂的处理才能使其不仅仅是舍入误差。


推荐阅读