首页 > 解决方案 > 将数据保存到 Google 数据存储区时出错

问题描述

我们正在尝试使用此 repo 作为基础将流数据保存到 Google Datastore:https ://github.com/meraki/google-cloud-scanning-api 。在更新到 Meraki 3.0 之后,我们不得不重构代码以反映新的数据结构。但是,我们现在在保存过程中在 Cloud Functions 中遇到错误:

ERROR SAVING: Error: 4 DEADLINE_EXCEEDED: Deadline Exceeded
at Object.exports.createStatusError (/workspace/node_modules/grpc/src/common.js:91:15)
at Object.onReceiveStatus (/workspace/node_modules/grpc/src/client_interceptors.js:1209:28)
at InterceptingListener._callNext (/workspace/node_modules/grpc/src/client_interceptors.js:568:42)
at InterceptingListener.onReceiveStatus (/workspace/node_modules/grpc/src/client_interceptors.js:618:8)
at callback (/workspace/node_modules/grpc/src/client_interceptors.js:847:24)
at callbackTrampoline (internal/async_hooks.js:126:14) {
code: 4,
metadata: Metadata { _internal_repr: {}, flags: 0 },
details: 'Deadline Exceeded'
}

ERROR SAVING: Error: 14 UNAVAILABLE: Received RST_STREAM with error code 7
at Object.exports.createStatusError (/workspace/node_modules/grpc/src/common.js:91:15)
at Object.onReceiveStatus (/workspace/node_modules/grpc/src/client_interceptors.js:1209:28)
at InterceptingListener._callNext (/workspace/node_modules/grpc/src/client_interceptors.js:568:42)
at InterceptingListener.onReceiveStatus (/workspace/node_modules/grpc/src/client_interceptors.js:618:8)
at callback (/workspace/node_modules/grpc/src/client_interceptors.js:847:24)
at callbackTrampoline (internal/async_hooks.js:126:14) {
code: 14,
metadata: Metadata { _internal_repr: {}, flags: 0 },
details: 'Received RST_STREAM with error code 7'
}

我们认为这是一个比率错误。我们的指标显示,我们从每秒大约 4.7 次调用增加到每秒 0.03 次调用。我们认为我们在更短的突发中摄取更多数据,导致它超过速率限制。我们尝试实施重试(从此处提取:https ://cloud.google.com/datastore/docs/concepts/transactions#uses_for_transactions ),但我们仍然在某些保存中看到错误。

我们的重试功能:

async function addObservationWithRetry(entry, type) {
    const maxTries = 5;
    async function tryRequest(currentAttempt, delay) {
        try {
            await addObservation(entry, type);
        } catch (err) {
            console.log(`Timeout, attempt ${currentAttempt}`);
            if (currentAttempt <= maxTries) {
                // Use exponential backoff
                setTimeout(async () => {
                    await tryRequest(currentAttempt + 1, delay * 2);
                }, delay);
            }
            throw err;
        }
    }
    await tryRequest(1, 100);
}

调用重试函数:

observations.forEach(function(entry) {
    addObservationWithRetry(entry, "event");
});

添加观察:

function addObservation(obs, type) {
    let entities = [];
    const locations = obs.locations;
    const entity = {
        key: "null",
        data: [<dictionaries for all variables>],
    };

    locations.forEach(function(loc) {
        entity.data[x].value = <different values that are the same for each location>;

        loc.rssiRecords.forEach(function(record) {
            const observationKey = datastore.key(type);
            entity.key = observationKey;
            entity.data[y].value = <different values that are the same for each rssi record>;
            entities.push(entity);

        });
    });

    if (entities.length > 0) {
        datastore
            .save(entities) // Push array of entities
            .then(() => {})
            .catch(err => {
                console.error('ERROR SAVING:', err);
            });
    }
}

标签: node.jsgoogle-cloud-platformgoogle-cloud-datastoredatastore

解决方案


推荐阅读