首页 > 解决方案 > 如何同步调用 google-bigquery 删除和插入 API?

问题描述

我正在维护一个定期更改数据的交易记录数据库。

我有一个 cron 每半小时运行一次,从主数据库中提取最新交易并提供给我的 express 节点应用程序(我对节点很陌生),我首先删除与传入交易的订单号匹配的旧交易,然后将最新的交易插入大查询表。

运行应用程序一天后,我的数据库中出现重复的事务。即使在检查日志之后,我也没有看到删除 api 在任何地方都失败了,也不知道重复项是如何以及从哪里来的。

我正在使用@google-cloud/bigquery: ^2.0.2,我正在使用查询api 将数据删除并插入到 bigquery 表中。

我曾尝试使用流式插入,但它不允许我删除最近插入的行,直到90 分钟,这在我的情况下不起作用。

我的 index.js
让 orderNumbers = '';

                    rows.map(function (value) {
                        orderNumbers += "'" + value.Order_Number+ "',";
                    });

                    orderNumbers = orderNumbers.slice(0, -1);

                    await functions.deleteAllWhere('Order_Number', orderNumbers);

                        let chunkedRowsArray = _.chunk(rows, CONSTANTS.chunkSize);


                        let arrSize = chunkedRowsArray.length;
                        for (var i = 0; i < arrSize; i++) {
                            let insertString = '';

                            chunkedRowsArray[i].forEach(element => {
                                let values = '(';
                                Object.keys(element).forEach(function (key) {
                                    if (typeof element[key] == 'string') {
                                        values += '"' + element[key] + '",';
                                    } else {
                                        values += element[key] + ",";
                                    }
                                });
                                values = values.slice(0, -1);
                                values += '),';
                                insertString += values;
                            });
                            insertString = insertString.slice(0, -1);

                            let rs = await functions.bulkInsert(insertString,i);
                        }

删除函数调用

await functions.deleteAllWhere('Order_Number', orderNumbers);

module.exports.deleteAllWhere = async (conditionKey, params) => {

const DELETEQUERY = `
DELETE FROM
\`${URI}\` 
WHERE ${conditionKey}
IN
(${params})`;

const options = {
    query: DELETEQUERY,
    timeoutMs: 300000,
    useLegacySql: false, // Use standard SQL syntax for queries.
};

// // Runs the query
return await bigquery.query(options);
};

类似地在插入函数中使用 200 块的值构建插入查询。

我需要编写一个同步节点程序,它首先删除一些行,然后在成功删除行后插入新行。

我不知道这是由代码的异步性质引起的,还是 bigquery 出了问题,或者我从中获取数据的存储过程有问题。

很抱歉这篇长帖子我是节点和堆栈溢出的新手。

任何帮助表示赞赏。

标签: mysqlnode.jsexpressgoogle-bigquery

解决方案


关于 BigQuery 集成,您应该以这样的方式构建您的数据流,以让 BigQuery 表中的每个新行。然后有只返回最新行的查询,如果您有一个按最新行排序的字段,这很容易做到。

您可以安排 BigQuery 查询来维护此清理数据的具体化表。所以最终你会得到两张表,一张你流到所有行中,一张被物化为只保留最新的。


推荐阅读