首页 > 解决方案 > 序列化 Azure 长时间运行的操作以供以后重用

问题描述

我正在尝试使用 Azure SDK for javascript(@azure/arm-sql 版本 8.0.0)来复制 SQL 数据库,但我不想等到操作完成。相反,我想在创建请求后退出,稍后(假设每分钟)检查操作是否完成。SDK似乎通过功能支持我的用例:

getPollState()

获取一个 LROPollState 对象,该对象可用于在不同的上下文中轮询此 LRO(例如在不同的进程或不同的机器上)。如果 LRO 无法生成 LRO 轮询策略,那么这将返回 undefined。

restoreLROPoller()

从提供的 LROPollState 中恢复 LROPoller。此方法可用于在不同的进程或机器上重新创建 LROPoller。

但是,文档没有指定如何通过网络对状态进行序列化/传输。我天真地尝试将其序列化为 JSON,但是当我运行下面的代码段时,出现以下错误:

TypeError: operationSpec.serializer.deserialize is not a function occurred in deserializing the responseBody - {"name":"b9952e45-85ff-41f8-b01c-83050c9d9a2c","status":"InProgress","startTime":"2021-10-14T15:38:01.59Z"}

这是一个简化的代码片段:

import { SqlManagementClient } from "@azure/arm-sql";
import { DefaultAzureCredential } from "@azure/identity";
import { LROPoller } from "@azure/ms-rest-azure-js";

const subscription = "<subscription ID>";
const rg = "myResourceGroup";
const server = "mySqlServer";
const dbName = "myDb";
const credentials = new DefaultAzureCredential();

const sqlClient = new SqlManagementClient(credentials, subscription);
const originalDb = await sqlClient.databases.get(rg, server, dbName);
const operation: LROPoller = await sqlClient.databases.beginCreateOrUpdate(rg, server, dbName + "_copy", {
    location: "westeurope",
    createMode: "Copy",
    sourceDatabaseId: originalDb.id
});

const operationState = operation.getPollState()!;
const serializedState = JSON.stringify(operationState);

// The program would save the state somewhere and exit now, but let's make it simple.

const deserializedState = JSON.parse(serializedState);
const restoredOperation: LROPoller = sqlClient.restoreLROPoller(deserializedState);

// Following line throws the exception
// TypeError: operationSpec.serializer.deserialize is not a function occurred in deserializing the responseBody…
await restoredOperation.poll();

所以我的问题是如何以一种以后可以重用的方式保存操作状态。

标签: node.jsazureazure-sdk-js

解决方案


对于那些可能想要实现类似目标的人,这里是解决方法。但是,我仍然想摆脱额外的代码并使用 SDK 功能本身,所以如果有人能回答最初的问题,我会非常高兴。

这是一个AzureOperations.ts带有辅助功能的文件

import { TokenCredential } from "@azure/core-auth";
import { LROPoller } from "@azure/ms-rest-azure-js";
import fetch from "node-fetch";

export interface AzureOperationReference {
    statusUrl: string
}

export interface AzureOperation {
    status: "InProgress" | "Succeeded" | "Failed" | "Canceled"

    error?: {
        code: string,
        message: string
    }
}

export const createAzureOperationReference = (operation: LROPoller): AzureOperationReference => {
    const asyncOperationHeader = "Azure-AsyncOperation";
    const headers = operation.getInitialResponse().headers;
    if (!headers.contains(asyncOperationHeader)) {
        throw new Error(`Given operation is currently not supported because it does not contain header '${asyncOperationHeader}'. If you want to track this operation, implement logic that uses header 'Location' first.`);
    }

    return {
        statusUrl: headers.get(asyncOperationHeader)!
    };
};

export const createAzureOperationChecker = (operationReference: AzureOperationReference, credentials: TokenCredential) => {
    let token: string = null!;
    let tokenExpiration = 0;
    let previousOperation: AzureOperation = null!;
    let retryAfter = 0;

    return async () => {
        const now = new Date().getTime();
        if (now < retryAfter) {
            return previousOperation;
        }

        if (tokenExpiration < now) {
            const newToken = await credentials.getToken("https://management.azure.com/.default");
            if (newToken === null) {
                throw new Error("Cannot obtain new Azure access token.");
            }

            tokenExpiration = newToken.expiresOnTimestamp;
            token = newToken.token;
        }

        const response = await fetch(operationReference.statusUrl, {
            method: "GET",
            headers: {
                Authorization: `Bearer ${token}`
            }
        });

        const retryLimitInMiliseconds = Number(response.headers.get("Retry-After")) * 1000;
        retryAfter = new Date().getTime() + retryLimitInMiliseconds;

        return previousOperation = await response.json() as AzureOperation;
    }
}

然后您可以导入并使用它们来跟踪待处理的操作:

// unimportant code removed for brevity
import { createAzureOperationReference, createAzureOperationChecker } from "./AzureOperations.js";

const credentials = new DefaultAzureCredential();
const operation: LROPoller = await sqlClient.databases.beginCreateOrUpdate(…);

// You can serialize this reference as json and store it wherever you want
const reference = createAzureOperationReference(operation);


// You can deserialize it later and use it to fetch the operation status
const checkOperation = createAzureOperationChecker(reference, credentials);
const operationStatus = await checkOperation();
console.log(operationStatus.status);

推荐阅读