mysql - 如何同步调用 google-bigquery 删除和插入 API?
问题描述
我正在维护一个定期更改数据的交易记录数据库。
我有一个 cron 每半小时运行一次,从主数据库中提取最新交易并提供给我的 express 节点应用程序(我对节点很陌生),我首先删除与传入交易的订单号匹配的旧交易,然后将最新的交易插入大查询表。
运行应用程序一天后,我的数据库中出现重复的事务。即使在检查日志之后,我也没有看到删除 api 在任何地方都失败了,也不知道重复项是如何以及从哪里来的。
我正在使用@google-cloud/bigquery: ^2.0.2
,我正在使用查询api 将数据删除并插入到 bigquery 表中。
我曾尝试使用流式插入,但它不允许我删除最近插入的行,直到90 分钟,这在我的情况下不起作用。
我的 index.js
让 orderNumbers = '';
rows.map(function (value) {
orderNumbers += "'" + value.Order_Number+ "',";
});
orderNumbers = orderNumbers.slice(0, -1);
await functions.deleteAllWhere('Order_Number', orderNumbers);
let chunkedRowsArray = _.chunk(rows, CONSTANTS.chunkSize);
let arrSize = chunkedRowsArray.length;
for (var i = 0; i < arrSize; i++) {
let insertString = '';
chunkedRowsArray[i].forEach(element => {
let values = '(';
Object.keys(element).forEach(function (key) {
if (typeof element[key] == 'string') {
values += '"' + element[key] + '",';
} else {
values += element[key] + ",";
}
});
values = values.slice(0, -1);
values += '),';
insertString += values;
});
insertString = insertString.slice(0, -1);
let rs = await functions.bulkInsert(insertString,i);
}
删除函数调用
await functions.deleteAllWhere('Order_Number', orderNumbers);
module.exports.deleteAllWhere = async (conditionKey, params) => {
const DELETEQUERY = `
DELETE FROM
\`${URI}\`
WHERE ${conditionKey}
IN
(${params})`;
const options = {
query: DELETEQUERY,
timeoutMs: 300000,
useLegacySql: false, // Use standard SQL syntax for queries.
};
// // Runs the query
return await bigquery.query(options);
};
类似地在插入函数中使用 200 块的值构建插入查询。
我需要编写一个同步节点程序,它首先删除一些行,然后在成功删除行后插入新行。
我不知道这是由代码的异步性质引起的,还是 bigquery 出了问题,或者我从中获取数据的存储过程有问题。
很抱歉这篇长帖子我是节点和堆栈溢出的新手。
任何帮助表示赞赏。
解决方案
关于 BigQuery 集成,您应该以这样的方式构建您的数据流,以让 BigQuery 表中的每个新行。然后有只返回最新行的查询,如果您有一个按最新行排序的字段,这很容易做到。
您可以安排 BigQuery 查询来维护此清理数据的具体化表。所以最终你会得到两张表,一张你流到所有行中,一张被物化为只保留最新的。
推荐阅读
- amazon-web-services - 无法在 VPC 中使用 SES 触发电子邮件
- autodesk-forge - Forge 数据连接器:最小计划间隔
- c# - 使用 Google 的 Safe Browsing Lookup API 时应将什么指定为“clientId”和“clientVersion”?
- visual-studio - 我遇到了“已经安装了更高版本”的问题。为 uwp 托管应用程序安装 windows sdk 时
- node.js - Typeorm 突然无法在模块 import { __awaiter, __generator } from "tslib" 之外导入语句
- php - Windows 10/Apache24/PHP 7.4.19 未加载 mysqli
- android-studio - java.lang.ClassCastException:无法投射 android.app.Application 使我的应用程序崩溃
- python - 熊猫数据帧索引中的2个条件
- flutter - 尝试将图像名称列表保存到 Firestore
- powerbi - Power BI 计算表未由数据切片器过滤