azure - Azure Durable 功能在下载文件后从本地存储中删除文件
问题描述
我在这项任务中挣扎了很多。我必须从 SFTP 下载文件,然后解析它们。我正在使用这样的持久功能
[FunctionName("MainOrch")]
public async Task<List<string>> RunOrchestrator(
[OrchestrationTrigger] IDurableOrchestrationContext context, ILogger log)
{
try
{
var filesDownloaded = new List<string>();
var filesUploaded = new List<string>();
var files = await context.CallActivityAsync<List<string>>("SFTPGetListOfFiles", null);
log.LogInformation("!!!!FilesFound*******!!!!!" + files.Count);
if (files.Count > 0)
{
foreach (var fileName in files)
{
filesDownloaded.Add(await context.CallActivityAsync<string>("SFTPDownload", fileName));
}
var parsingTasks = new List<Task<string>>(filesDownloaded.Count);
foreach (var downlaoded in filesDownloaded)
{
var parsingTask = context.CallActivityAsync<string>("PBARParsing", downlaoded);
parsingTasks.Add(parsingTask);
}
await Task.WhenAll(parsingTasks);
}
return filesDownloaded;
}
catch (Exception ex)
{
throw;
}
}
SFTPGetListOfFiles:此函数连接到 SFTP 并获取文件夹中的文件列表并返回。
SFTPDownload:此函数假设连接到 SFTP 并下载 Azure Function 的 Tempt Storage 中的每个文件。并返回下载路径。(每个文件从 10 到 60 MB)
[FunctionName("SFTPDownload")]
public async Task<string> SFTPDownload([ActivityTrigger] string name, ILogger log, Microsoft.Azure.WebJobs.ExecutionContext context)
{
var downloadPath = "";
try
{
using (var session = new Session())
{
try
{
session.ExecutablePath = Path.Combine(context.FunctionAppDirectory, "winscp.exe");
session.Open(GetOptions(context));
log.LogInformation("!!!!!!!!!!!!!!Connected For Download!!!!!!!!!!!!!!!");
TransferOptions transferOptions = new TransferOptions();
transferOptions.TransferMode = TransferMode.Binary;
downloadPath = Path.Combine(Path.GetTempPath(), name);
log.LogInformation("Downloading " + name);
var transferResult = session.GetFiles("/Receive/" + name, downloadPath, false, transferOptions);
log.LogInformation("Downloaded " + name);
// Throw on any error
transferResult.Check();
log.LogInformation("!!!!!!!!!!!!!!Completed Download !!!!!!!!!!!!!!!!");
}
catch (Exception ex)
{
log.LogError(ex.Message);
}
finally
{
session.Close();
}
}
}
catch (Exception ex)
{
log.LogError(ex.Message);
_traceService.TraceException(ex);
}
return downloadPath;
}
PBARParsing:函数必须获取该文件的流并对其进行处理(处理一个 60 MB 的文件可能需要几分钟的 S2 扩展和 10 个实例的扩展。)
[FunctionName("PBARParsing")]
public async Task PBARParsing([ActivityTrigger] string pathOfFile,
ILogger log)
{
var theSplit = pathOfFile.Split("\\");
var name = theSplit[theSplit.Length - 1];
try
{
log.LogInformation("**********Starting" + name);
Stream stream = File.OpenRead(pathOfFile);
我希望使用 SFTPDownload 完成所有文件的下载,这就是为什么“等待”处于循环中的原因。然后我希望解析并行运行。
问题 1:MainOrch 函数中的代码对于执行这 3 件事是否正确 1)获取文件的名称,2)一个一个地下载它们并且在所有文件下载之前不启动解析功能。然后3)并行解析文件。?
我观察到我在问题 1 中提到的内容按预期工作。
问题 2: 30% 的文件已被解析,对于 80% 的文件,我看到“找不到文件 'D:\local\Temp\fileName'”的错误是 azure function 在我放置文件后删除文件?我还有其他方法可以采取吗?如果我将路径更改为“D:\home”,我可能会看到“文件正在被另一个进程使用”错误。但我还没有尝试过。奇怪的是,SFTP 上的 68 个文件最后 20 次运行,前 40 个文件在该路径中未找到,这是按顺序排列的。
问题3:我也看到这个错误“单例锁更新失败,错误代码为409:LeaseIdMismatchWithLeaseOperation。最后一次成功更新完成于2020-08-08T17:57:10.494Z(46005毫秒前),持续时间为 155 毫秒。租用期为 15000 毫秒。” 它说明了什么?不过它只来了一次。
使用“D:\home”后更新 我没有找到文件未找到错误
解决方案
推荐阅读
- nlp - 手套中超参数的含义
- awk - 如何根据特定列值选择性地删除串联重复行?
- python - 索引两次后如何更新 Pytorch 中的张量?
- java - WeakHashMap 和 ReentrantReadWriteLock
- ios - 向服务器发送设备令牌或注册令牌?
- spring - 运行错误:ERROR 8712 --- [restartedMain] osboot.SpringApplication
- node.js - 从 mongoDB 中选择 Random 10 个文档
- sql - 如何编写多列“in” sql 查询语法?
- assembly - movslq 是做什么的?
- javascript - 使用 VueX Store 中的计算属性进行 Ajax 调用的正确方法是什么