首页 > 解决方案 > 从文件加载 100 万条记录并保存到 PSQL 数据库

问题描述

我有一个包含 100 万条记录的文件,我必须将其中的一条记录传递给弹性搜索并将结果数据保存到数据库中。但问题是,这样做需要很长时间,因为记录会一一传输到 elasticsearch,然后将数据保存到 PSQL 数据库中。我想要一些建议,我该如何改进或应该使用其他一些工具。

现在我正在使用带有一些包的 Nodejs:

我在 nodejs 应用程序中上传文件并使用将其转换为 json 文件 const csv=require('csvtojson')

我用

const StreamArray = require('stream-json/streamers/StreamArray');
const {Writable} = require('stream');

由于文件太大,因此使用流读取 json 并通过这些包解析它。我使用此代码

const fileStream = fs.createReadStream(this.fileName);
            const jsonStream = StreamArray.withParser();
            const incomingThis = this;
            const processingStream = new Writable({
                write({key, value}, encoding, callback) {
                    incomingThis.recordParser(value, (val, data) => { // pass the data to elasticsearch to get search data
                        incomingThis.processQueue(data); // save the data to the PSQL database
                        callback();
                    });
                },
                //Don't skip this, as we need to operate with objects, not buffers
                objectMode: true
            });
            //Pipe the streams as follows
            fileStream.pipe(jsonStream.input);
            jsonStream.pipe(processingStream);
            //So we're waiting for the 'finish' event when everything is done.
            processingStream.on('finish', async () => {
                console.log('stream end');
                const statistics = new Statistics(jobId);
                await statistics.update(); // update the job table for completion of data
            });

请建议我如何改进这一点,以便在几个小时而不是几天或更短的时间内解析 100 万条记录文件。我也愿意使用任何其他工具,例如 redis,如果这些对我有帮助,请激发火花。

谢谢。

标签: node.jsapache-sparkstream

解决方案


而不是从流中一一按下。使用批量方法(创建多个批次)以 弹性方式获取数据并批量保存。


推荐阅读