c# - Hangfire 为不同服务器安排后台任务
问题描述
我有一个使用多个 Hangfire 服务器的 .Net 应用程序。
我希望能够让一个 Hangfire RecurringJob 触发多个 BackgroundJobs,可以由任何可用的服务器获取。目前,每当我从 Hangfire 作业安排后台作业时,只有安排它们的服务器会处理它们。
例如,我有 5 个 Hangfire 服务器和 10 个任务。我希望每个 Hangfire 服务器上有 2 个任务,而不是我看到 1 个服务器有 10 个任务,4 个有 0 个任务。
所以我有 5 个 Hangfire 服务器,都使用同一个数据库,还有 1 个 RecurringJob,这个 RecurringJob 只是读取一些文件并将几个后台作业排入队列。
foreach (var file in reportSourceSetFileList)
{
_logger.LogInformation($"Queuing Background job for: {file}");
var backgroundJobId = BackgroundJob.Enqueue<IJobHandler>(job => job.ProcessFile(file, files, null));
}
但是,只有运行 RecurringJob 的 Hangfire Server 才会处理 Enqueued 作业。
我怎样才能让我的 5 个 Hangfire 服务器中的任何一个处理那些排队的作业,而不仅仅是将它们排队的那个?
解决方案
Hangfire 中没有内置功能可以在多个 Hangfire 服务器之间使用循环类型的负载平衡器。
我的解决方案是使用排队系统。当每个 Hangfire 服务器启动时,它们都会获得一个任务标识符,即 GUID,我还向该服务器添加了一个唯一队列,该队列使用与其名称相同的 GUID。
因此,每个服务器将查看 2 个队列,即 Default 和 GUID。
然后我使用以下代码查找当前处理的作业最少的服务器。
private string GetNextAvailableServer()
{
var serverJobCounts = new Dictionary<string, int>();
//get active servers
var serverList = JobStorage.Current.GetMonitoringApi().Servers();
foreach (var server in serverList)
{
if (server.Heartbeat.Value > DateTime.Now.AddMinutes(-1))
{
serverJobCounts.Add(server.Name, 0);
foreach (var queue in server.Queues)
{
var currentQueues = JobStorage.Current.GetMonitoringApi().Queues();
serverJobCounts[server.Name] += (int?)currentQueues.FirstOrDefault(e => e.Name == queue)?.Length ?? 0;
}
}
}
var jobs = JobStorage.Current.GetMonitoringApi().ProcessingJobs(0, int.MaxValue);
foreach (var job in jobs)
{
if (serverJobCounts.ContainsKey(job.Value.ServerId))
{
serverJobCounts[job.Value.ServerId] += 1;
}
}
var nextServer = serverJobCounts.OrderBy(e => e.Value).FirstOrDefault().Key;
return nextServer.Split(':')[0].Replace("-", string.Empty, StringComparison.InvariantCulture);
}
这将返回作业最少的服务器的 GUID,它也是队列的名称。因此,您可以将下一个作业安排到当前处理的作业最少的特定队列中。
var nextServer = GetNextAvailableServer();
var client = new BackgroundJobClient();
var state = new EnqueuedState(nextServer);
var enqueueJob = client.Create<IJobHandler>(job => job.ProcessFile(file, files, null), state);
此外,当我编写此 Hangfire 时,不允许在队列名称中使用连字符,因此我进行了字符串操作以使 GUID 工作。我认为最新版本的hangfire 允许您在名称中使用连字符。
需要注意的一件事是,当你们中的一个服务器死机时,此解决方案就会中断。由于如果观察该队列的服务器在处理该作业之前死了,则该作业被赋予一个唯一的队列,它将永远不会被拾取。
推荐阅读
- neural-network - Caffe:欧几里得损失错误:输入必须具有相同的维度
- python-3.x - 我的输出只显示错误,即使它是正确的
- android - 无法解析常用打开文件 gradle.build
- python - 如何显示程序正在处理?
- python - 以矢量化方式切片熊猫字符串
- r - 如何使用 gbm 在广义 Boosted Regression 模型中处理不平衡数据集?
- swift - 更新绑定到数组错误的tableview
- python - 从python中的另一个函数调用嵌套函数?
- swift - JSQMessagesViewController 发送位置未加载
- rust - 如何提供自定义测试属性?