node.js - 将数据保存到 Google 数据存储区时出错
问题描述
我们正在尝试使用此 repo 作为基础将流数据保存到 Google Datastore:https ://github.com/meraki/google-cloud-scanning-api 。在更新到 Meraki 3.0 之后,我们不得不重构代码以反映新的数据结构。但是,我们现在在保存过程中在 Cloud Functions 中遇到错误:
ERROR SAVING: Error: 4 DEADLINE_EXCEEDED: Deadline Exceeded
at Object.exports.createStatusError (/workspace/node_modules/grpc/src/common.js:91:15)
at Object.onReceiveStatus (/workspace/node_modules/grpc/src/client_interceptors.js:1209:28)
at InterceptingListener._callNext (/workspace/node_modules/grpc/src/client_interceptors.js:568:42)
at InterceptingListener.onReceiveStatus (/workspace/node_modules/grpc/src/client_interceptors.js:618:8)
at callback (/workspace/node_modules/grpc/src/client_interceptors.js:847:24)
at callbackTrampoline (internal/async_hooks.js:126:14) {
code: 4,
metadata: Metadata { _internal_repr: {}, flags: 0 },
details: 'Deadline Exceeded'
}
和
ERROR SAVING: Error: 14 UNAVAILABLE: Received RST_STREAM with error code 7
at Object.exports.createStatusError (/workspace/node_modules/grpc/src/common.js:91:15)
at Object.onReceiveStatus (/workspace/node_modules/grpc/src/client_interceptors.js:1209:28)
at InterceptingListener._callNext (/workspace/node_modules/grpc/src/client_interceptors.js:568:42)
at InterceptingListener.onReceiveStatus (/workspace/node_modules/grpc/src/client_interceptors.js:618:8)
at callback (/workspace/node_modules/grpc/src/client_interceptors.js:847:24)
at callbackTrampoline (internal/async_hooks.js:126:14) {
code: 14,
metadata: Metadata { _internal_repr: {}, flags: 0 },
details: 'Received RST_STREAM with error code 7'
}
我们认为这是一个比率错误。我们的指标显示,我们从每秒大约 4.7 次调用增加到每秒 0.03 次调用。我们认为我们在更短的突发中摄取更多数据,导致它超过速率限制。我们尝试实施重试(从此处提取:https ://cloud.google.com/datastore/docs/concepts/transactions#uses_for_transactions ),但我们仍然在某些保存中看到错误。
我们的重试功能:
async function addObservationWithRetry(entry, type) {
const maxTries = 5;
async function tryRequest(currentAttempt, delay) {
try {
await addObservation(entry, type);
} catch (err) {
console.log(`Timeout, attempt ${currentAttempt}`);
if (currentAttempt <= maxTries) {
// Use exponential backoff
setTimeout(async () => {
await tryRequest(currentAttempt + 1, delay * 2);
}, delay);
}
throw err;
}
}
await tryRequest(1, 100);
}
调用重试函数:
observations.forEach(function(entry) {
addObservationWithRetry(entry, "event");
});
添加观察:
function addObservation(obs, type) {
let entities = [];
const locations = obs.locations;
const entity = {
key: "null",
data: [<dictionaries for all variables>],
};
locations.forEach(function(loc) {
entity.data[x].value = <different values that are the same for each location>;
loc.rssiRecords.forEach(function(record) {
const observationKey = datastore.key(type);
entity.key = observationKey;
entity.data[y].value = <different values that are the same for each rssi record>;
entities.push(entity);
});
});
if (entities.length > 0) {
datastore
.save(entities) // Push array of entities
.then(() => {})
.catch(err => {
console.error('ERROR SAVING:', err);
});
}
}
解决方案
推荐阅读
- 3d - 使用 mtl 显示 OBJ 时,QVTKWidget 未在许多面上显示颜色
- c - 如何使用递归函数在 C 中计算矩阵的行列式
- python - 解释这个帕斯卡算法的工作原理
- ios - Swift - 如何使用 UIGraphics 混合模式?
- python - 如何从 ConfigParser 中提取数据
- angular - 仅将行的选择限制为复选框,而不是角度 6 mat-table 中的整行
- memory - DMA 内存块不接受输入
- c - 编译器会覆盖手动分配的地址吗?(请同时查看代码)
- node.js - Docker - 节点,找不到 mysql 模块
- c# - 为什么我要使用这样的接口属性?