node.js - 序列化 Azure 长时间运行的操作以供以后重用
问题描述
我正在尝试使用 Azure SDK for javascript(@azure/arm-sql 版本 8.0.0)来复制 SQL 数据库,但我不想等到操作完成。相反,我想在创建请求后退出,稍后(假设每分钟)检查操作是否完成。SDK似乎通过功能支持我的用例:
getPollState()
获取一个 LROPollState 对象,该对象可用于在不同的上下文中轮询此 LRO(例如在不同的进程或不同的机器上)。如果 LRO 无法生成 LRO 轮询策略,那么这将返回 undefined。
和restoreLROPoller()
从提供的 LROPollState 中恢复 LROPoller。此方法可用于在不同的进程或机器上重新创建 LROPoller。
但是,文档没有指定如何通过网络对状态进行序列化/传输。我天真地尝试将其序列化为 JSON,但是当我运行下面的代码段时,出现以下错误:
TypeError: operationSpec.serializer.deserialize is not a function occurred in deserializing the responseBody - {"name":"b9952e45-85ff-41f8-b01c-83050c9d9a2c","status":"InProgress","startTime":"2021-10-14T15:38:01.59Z"}
这是一个简化的代码片段:
import { SqlManagementClient } from "@azure/arm-sql";
import { DefaultAzureCredential } from "@azure/identity";
import { LROPoller } from "@azure/ms-rest-azure-js";
const subscription = "<subscription ID>";
const rg = "myResourceGroup";
const server = "mySqlServer";
const dbName = "myDb";
const credentials = new DefaultAzureCredential();
const sqlClient = new SqlManagementClient(credentials, subscription);
const originalDb = await sqlClient.databases.get(rg, server, dbName);
const operation: LROPoller = await sqlClient.databases.beginCreateOrUpdate(rg, server, dbName + "_copy", {
location: "westeurope",
createMode: "Copy",
sourceDatabaseId: originalDb.id
});
const operationState = operation.getPollState()!;
const serializedState = JSON.stringify(operationState);
// The program would save the state somewhere and exit now, but let's make it simple.
const deserializedState = JSON.parse(serializedState);
const restoredOperation: LROPoller = sqlClient.restoreLROPoller(deserializedState);
// Following line throws the exception
// TypeError: operationSpec.serializer.deserialize is not a function occurred in deserializing the responseBody…
await restoredOperation.poll();
所以我的问题是如何以一种以后可以重用的方式保存操作状态。
解决方案
对于那些可能想要实现类似目标的人,这里是解决方法。但是,我仍然想摆脱额外的代码并使用 SDK 功能本身,所以如果有人能回答最初的问题,我会非常高兴。
这是一个AzureOperations.ts
带有辅助功能的文件
import { TokenCredential } from "@azure/core-auth";
import { LROPoller } from "@azure/ms-rest-azure-js";
import fetch from "node-fetch";
export interface AzureOperationReference {
statusUrl: string
}
export interface AzureOperation {
status: "InProgress" | "Succeeded" | "Failed" | "Canceled"
error?: {
code: string,
message: string
}
}
export const createAzureOperationReference = (operation: LROPoller): AzureOperationReference => {
const asyncOperationHeader = "Azure-AsyncOperation";
const headers = operation.getInitialResponse().headers;
if (!headers.contains(asyncOperationHeader)) {
throw new Error(`Given operation is currently not supported because it does not contain header '${asyncOperationHeader}'. If you want to track this operation, implement logic that uses header 'Location' first.`);
}
return {
statusUrl: headers.get(asyncOperationHeader)!
};
};
export const createAzureOperationChecker = (operationReference: AzureOperationReference, credentials: TokenCredential) => {
let token: string = null!;
let tokenExpiration = 0;
let previousOperation: AzureOperation = null!;
let retryAfter = 0;
return async () => {
const now = new Date().getTime();
if (now < retryAfter) {
return previousOperation;
}
if (tokenExpiration < now) {
const newToken = await credentials.getToken("https://management.azure.com/.default");
if (newToken === null) {
throw new Error("Cannot obtain new Azure access token.");
}
tokenExpiration = newToken.expiresOnTimestamp;
token = newToken.token;
}
const response = await fetch(operationReference.statusUrl, {
method: "GET",
headers: {
Authorization: `Bearer ${token}`
}
});
const retryLimitInMiliseconds = Number(response.headers.get("Retry-After")) * 1000;
retryAfter = new Date().getTime() + retryLimitInMiliseconds;
return previousOperation = await response.json() as AzureOperation;
}
}
然后您可以导入并使用它们来跟踪待处理的操作:
// unimportant code removed for brevity
import { createAzureOperationReference, createAzureOperationChecker } from "./AzureOperations.js";
const credentials = new DefaultAzureCredential();
const operation: LROPoller = await sqlClient.databases.beginCreateOrUpdate(…);
// You can serialize this reference as json and store it wherever you want
const reference = createAzureOperationReference(operation);
// You can deserialize it later and use it to fetch the operation status
const checkOperation = createAzureOperationChecker(reference, credentials);
const operationStatus = await checkOperation();
console.log(operationStatus.status);
推荐阅读
- azure-devops - PickList Azure Devops 自定义任务
- android - 使用 Room 数据库时获取“实体和 POJO 必须具有可用的公共构造函数”
- python - 当索引超出范围时从列表的开头开始?
- swift - 如何在 Swift 5.2-Xcode 版本 11.5 上覆盖 func observeValueForKeyPath
- python - 使用多处理时python中的缓慢pickle转储
- java - 给定行列表列表,将每行的字符串表示映射到其出现的总数
- snowflake-cloud-data-platform - Snowpipe 自动摄取配置看起来会触发所有管道
- python - 何时在 Python 中运行外部命令
- java - 嵌入式 Kafka 测试结果失败,大括号添加到 Value
- sql - 从一个表中查找最后一个日期和值并与另一个查询的结果相结合