c# - Windows 服务/SQL Server 并发问题
问题描述
我有一个充当任务队列的数据库表。向该表的插入发生在其他地方(例如,表触发器等)。
我有一个执行以下操作的多线程 Windows 服务:
- 使用存储过程,读取标记为 的行的队列表
'todo'
,并将它读取的行标记为'doing'
。 - 处理这些行。
- 将行标记为
'done'
。
理想情况:
当服务运行时,多个处理器/线程将处理一组唯一的myqueue
行。
实际:
有时多个处理器/线程检索相同的myqueue
行。这意味着myqueue
可以多次处理一行。
问题: 这里有我遗漏的并发问题吗?
T-SQL 代码:
CREATE TABLE myqueue (
id varchar(20) NOT NULL,
task_data varchar(255) NULL,
status varchar(10) NULL,
PRIMARY KEY CLUSTERED ( id ASC )
CREATE PROCEDURE dbo.getNextInQueue @itemsToGet INT = 1000
AS
BEGIN
IF OBJECT_ID('tempdb..#tmpnext') IS NULL
BEGIN
CREATE TABLE #tmpnext (
id VARCHAR(20)
,PRIMARY KEY CLUSTERED (id)
)
END
INSERT INTO #tmpnext (id)
SELECT TOP (@itemsToGet) id
FROM myqueue WITH (
UPDLOCK
,READPAST
)
WHERE status = 'todo'
UPDATE m
SET m.status = 'doing'
FROM (
SELECT id
FROM #tmpnext
) AS tmp
INNER JOIN myqueue m ON m.id = tmp.id
SELECT m.*
FROM myqueue m WITH (NOLOCK)
JOIN #tmpnext t ON t.id = m.id
END
C#代码:
public class QueueProcessorService : System.ServiceProcess.ServiceBase
{
private System.Threading.Timer IntervalTimer;
private ServiceParams serviceParams; // A POCO that reads arguments, returns defaults if not provided.
private bool isStopped = false;
private readonly object _mutex = new object();
private int concurrentRunningProcessors = 0;
private string ConnectionString => ConfigurationManager.AppSettings["ConnectionString"];
protected override void OnStart(string[] args)
{
serviceParams = new ServiceParams(args);
IntervalTimer = new System.Threading.Timer(
callback: new System.Threading.TimerCallback(IntervalTimer_ElapsedAsync),
state: null,
dueTime: serviceParams.PeriodInMillisec,
period: serviceParams.PeriodInMillisec);
}
protected override void OnStop()
{
isStopped = true;
IntervalTimer.Change(System.Threading.Timeout.Infinite, System.Threading.Timeout.Infinite);
while (concurrentRunningProcessors > 0)
{
// Wait until running processors finish
};
IntervalTimer.Dispose();
IntervalTimer = null;
}
private async void IntervalTimer_ElapsedAsync(object state)
{
try
{
if (!isStopped && concurrentRunningProcessors <= serviceParams.MaxConcurrentProcessors)
{
lock (_mutex)
{
concurrentRunningProcessors++;
}
try
{
await ProcessTasksInQueue();
}
catch (Exception)
{
throw;
}
finally
{
if (concurrentRunningProcessors > 0)
{
lock (_mutex)
{
concurrentRunningProcessors--;
}
}
}
}
}
catch (Exception ex)
{
DealWithException(ex); // e.g. Logs, etc.
this.Stop();
}
}
private async Task ProcessTasksInQueue()
{
using (var conn = new SqlConnection(ConnectionString))
{
var cmd = conn.CreateCommand();
cmd.CommandText = "dbo.getNextInQueue";
cmd.CommandType = CommandType.StoredProcedure;
cmd.Parameters.Add(new SqlParameter("@itemsToGet", SqlDbType.Int) { Value = serviceParams.ItemsToGet });
var rowsRetrieved = new List<RowInTaskQueue>();
using (var reader = cmd.ExecuteReader())
{
while (reader.Read())
{
var row = new RowInTaskQueue()
{
Id = reader.GetString(reader.GetOrdinal("id")),
Data = reader.GetString(reader.GetOrdinal("task_data")),
Status = reader.GetString(reader.GetOrdinal("status")),
};
rowsRetrieved.Add(row);
}
}
ProcessTheRows(rowsRetrieved);
}
}
}
解决方案
推荐阅读
- c# - 使用数组和 For 循环的斐波那契数列
- ios - 使用调度等待数据附加到数组?
- reactjs - 在 URL 重定向时,React 组件总是回到初始状态
- android - 如何在 C/C++ 中增强这个 YUV420P 到 RGB 的转换?
- javascript - 我怎样才能缩短这个 javascript if 语句?(如果 1 ,如果 2 )
- discord.js - 如何从频道中的用户删除覆盖(Discord.js V12)
- javascript - Vue.js $emit 不会从子组件传递点击事件
- c++ - 将元组类型转换为衰减类型
- python - 类权重恶化了我的 keras 分类模型
- r - 使用 DiagrammeR 和 Xaringan 演示使图表居中