首页 > 解决方案 > 我将如何创建动态任务/线程来触发从数据库控制任务/线程的相同功能/方法?

问题描述

我构建了一个服务来调用要执行的任务。这是表驱动和动态的。因此,在服务的每个滴答声中(每 10 秒),它都会调用 SQL 表并找出处于活动状态的内容。如果它处于活动状态,它将构建一个新任务并将其放入对象列表中。每个任务都调用相同的函数,但我将一个唯一的数据行(其中包含数据表中的唯一 ID)传递给该方法的传递参数。

当任务同时运行时,我收到与应该运行的任务无关的随机错误(来自数据库调用)。

那么问题来了,是否有可能同时运行相同方法/函数的任务会相互踩踏,导致线程相交?

这是我的代码:

namespace ReportService
{
public partial class Service1 : ServiceBase
{
    //public timer
    private static System.Timers.Timer timerReports = null;

    //collection of tasks
    //private static BlockingCollection<ReportTasks> tasksCollection = new BlockingCollection<ReportTasks>();
    private static List<ReportTasks> tasksCollection = new List<ReportTasks>();

    #region Service functions

    public Service1()
    {
        InitializeComponent();
    }


    protected override void OnStart(string[] args)
    {
        try
        {
            ExceptionInfo exceptioninfo = new ExceptionInfo();
            exceptioninfo.LogType = "L";
            exceptioninfo.ClassName = MethodBase.GetCurrentMethod().DeclaringType.Name.ToString();
            exceptioninfo.MethodName = MethodBase.GetCurrentMethod().Name;
            exceptioninfo.OptionalMessage = "Service On";

            Log.WriteToLog(exceptioninfo, ConfigSettings.GetServiceSettings());

            timerReports = new System.Timers.Timer();
            timerReports.Interval = 30000;  //30 secconds
            timerReports.Elapsed += new ElapsedEventHandler(this.timerReports_Tick);
            timerReports.Enabled = true;
        }
        catch (Exception ex)
        {
            //Get Service settings
            dynamic ServiceSettings = ConfigSettings.GetServiceSettings();

            if (ex.Data["WrittenToLog"] == null)
            {
                //set new object
                ex.Data.Add("WrittenToLog", true);

                //build exception object
                ExceptionInfo exceptioninfo = new ExceptionInfo();
                exceptioninfo.LogType = "E";
                exceptioninfo.ClassName = MethodBase.GetCurrentMethod().DeclaringType.Name.ToString();
                exceptioninfo.MethodName = MethodBase.GetCurrentMethod().Name;
                exceptioninfo.InnerException = ex;

                //write to log
                Log.WriteToLog(exceptioninfo, ServiceSettings);
            }
            else if ((bool)ex.Data["WrittenToLog"] == false)
            {
                //set written to log as true
                ex.Data["WrittenToLog"] = true;

                //build exception object
                ExceptionInfo exceptioninfo = new ExceptionInfo();
                exceptioninfo.LogType = "E";
                exceptioninfo.ClassName = MethodBase.GetCurrentMethod().DeclaringType.Name.ToString();
                exceptioninfo.MethodName = MethodBase.GetCurrentMethod().Name;
                exceptioninfo.InnerException = ex;

                //write to log
                Log.WriteToLog(exceptioninfo, ServiceSettings);
            }

            //throw exception to back out of process
            //throw ex;
        }
    }

    protected override void OnStop()
    {
        try
        {

            timerReports.Enabled = false;

            ExceptionInfo exceptioninfo = new ExceptionInfo();
            exceptioninfo.LogType = "L";
            exceptioninfo.ClassName = MethodBase.GetCurrentMethod().DeclaringType.Name.ToString();
            exceptioninfo.MethodName = MethodBase.GetCurrentMethod().Name;
            exceptioninfo.OptionalMessage = "Service Off";

            Log.WriteToLog(exceptioninfo, ConfigSettings.GetServiceSettings());

        }
        catch (Exception ex)
        {
            //Get Service settings
            dynamic ServiceSettings = ConfigSettings.GetServiceSettings();

            if (ex.Data["WrittenToLog"] == null)
            {
                //set new object
                ex.Data.Add("WrittenToLog", true);

                //build exception object
                ExceptionInfo exceptioninfo = new ExceptionInfo();
                exceptioninfo.LogType = "E";
                exceptioninfo.ClassName = MethodBase.GetCurrentMethod().DeclaringType.Name.ToString();
                exceptioninfo.MethodName = MethodBase.GetCurrentMethod().Name;
                exceptioninfo.InnerException = ex;

                //write to log
                Log.WriteToLog(exceptioninfo, ServiceSettings);
            }
            else if ((bool)ex.Data["WrittenToLog"] == false)
            {
                //set written to log as true
                ex.Data["WrittenToLog"] = true;

                //build exception object
                ExceptionInfo exceptioninfo = new ExceptionInfo();
                exceptioninfo.LogType = "E";
                exceptioninfo.ClassName = MethodBase.GetCurrentMethod().DeclaringType.Name.ToString();
                exceptioninfo.MethodName = MethodBase.GetCurrentMethod().Name;
                exceptioninfo.InnerException = ex;

                //write to log
                Log.WriteToLog(exceptioninfo, ServiceSettings);
            }

            //throw exception to back out of process
            //throw ex;
        }
    }

    #endregion

    private void timerReports_Tick(object sender, ElapsedEventArgs e)
    {
        //check to see if task is running, if not, process EDI
        try
        {
            //remove completed tasks               
            tasksCollection.RemoveAll(item => item.ReportTask.Status == TaskStatus.RanToCompletion);


            //Get Service settings
            dynamic ServiceSettings = ConfigSettings.GetServiceSettings();

            //set the SQL command and parameters
            SQLcommand Sqlcommandobj = new SQLcommand();
            Sqlcommandobj.SQLcmd = @"SELECT *,'TABLE' AS [TABLE_NAME] 
                                     FROM EX_TABLE";
            Sqlcommandobj.SQLcmdType = CommandType.Text;

            //fill in list 
            DataSet dsReportSchedules = Queries.ServiceSQLExecute(ServiceSettings, Sqlcommandobj);

            //loop through each schedule to add/remove tasks
            foreach (DataRow drReport in dsReportSchedules.Tables["SCHEDULES"].Rows)
            {
                if (!tasksCollection.Any(item => item.ReportID == Helper.GetValueFromDataRowInt32(drReport, "REPORTS_SCHEDULE_ID")))
                {
                    if (Helper.GetValueFromDataRowString(drReport, "ACTIVE") == "1" && Helper.GetValueFromDataRowString(drReport, "DELETE_DATE") == string.Empty)
                    {
                        //create cancellation for task
                        var ts = new CancellationTokenSource();

                        //create new task
                        Task newTask = new Task(() => ReportProcess.BeginProcessingReport(drReport, ServiceSettings), ts.Token);

                        //fill in report tasks object
                        ReportTasks ReportTasks = new ReportTasks();
                        ReportTasks.ReportID = Helper.GetValueFromDataRowInt32(drReport, "REPORTS_SCHEDULE_ID");
                        ReportTasks.ReportName = Helper.GetValueFromDataRowString(drReport, "NAME");
                        ReportTasks.ReportTask = newTask;
                        ReportTasks.TaskID = newTask.Id;
                        ReportTasks.Active = Convert.ToBoolean(drReport["ACTIVE"]);
                        ReportTasks.CancelTokenSource = ts;

                        //add to task collection
                        tasksCollection.Add(ReportTasks);
                    }
                }
                else
                {
                    //remove if not active or deleted
                    if (Helper.GetValueFromDataRowString(drReport, "ACTIVE") != "1" || Helper.GetValueFromDataRowString(drReport, "DELETE_DATE") != string.Empty)
                    {
                        var itemToRemove = tasksCollection.SingleOrDefault(item => item.ReportID == Helper.GetValueFromDataRowInt32(drReport, "REPORTS_SCHEDULE_ID"));
                        if (itemToRemove.ReportID > 0)
                        {
                            //check to see if task is running
                            if (itemToRemove.ReportTask.Status == TaskStatus.Running)
                            {
                                itemToRemove.CancelTokenSource.Cancel();
                            }

                            //remove task from collection
                            tasksCollection.Remove(itemToRemove);
                        }
                    }
                }
            }

            //trigger each task
            foreach (var str in tasksCollection)
            {
                Console.WriteLine("Task: " + str.ReportName + " - Status: " + str.ReportTask.Status);
                if (str.ReportTask.Status == TaskStatus.RanToCompletion | str.ReportTask.Status == TaskStatus.Created)
                {
                    str.ReportTask.Start();
                }
            }

        }
        catch (Exception ex)
        {
            //Get Service settings
            dynamic ServiceSettings = ConfigSettings.GetServiceSettings();

            if (ex.Data["WrittenToLog"] == null)
            {
                //set new object
                ex.Data.Add("WrittenToLog", true);

                //build exception object
                ExceptionInfo exceptioninfo = new ExceptionInfo();
                exceptioninfo.LogType = "E";
                exceptioninfo.ClassName = MethodBase.GetCurrentMethod().DeclaringType.Name.ToString();
                exceptioninfo.MethodName = MethodBase.GetCurrentMethod().Name;
                exceptioninfo.InnerException = ex;

                //write to log
                Log.WriteToLog(exceptioninfo, ServiceSettings);
            }
            else if ((bool)ex.Data["WrittenToLog"] == false)
            {
                //set written to log as true
                ex.Data["WrittenToLog"] = true;

                //build exception object
                ExceptionInfo exceptioninfo = new ExceptionInfo();
                exceptioninfo.LogType = "E";
                exceptioninfo.ClassName = MethodBase.GetCurrentMethod().DeclaringType.Name.ToString();
                exceptioninfo.MethodName = MethodBase.GetCurrentMethod().Name;
                exceptioninfo.InnerException = ex;

                //write to log
                Log.WriteToLog(exceptioninfo, ServiceSettings);
            }

            //throw exception to back out of process
            //throw ex;
        }

    }

}
}

在上面,在计时器滴答声中,我浏览了任务列表中的每条记录,并删除了已完成的任务。然后我从数据库中获取记录并确定是否需要添加它们。它还将检查用户是否将它们标记为已删除或不活动,并将使用取消令牌将其停止,然后将其从列表中删除。最后,它将循环并触发列表中的任务。

这是在每个任务上调用的过程:

public static void BeginProcessingReport(Object drReportSchedule, dynamic 
ServiceSettings)
{
}

这是包含 List<> 任务的对象

public struct ReportTasks
{
    public string ReportName;
    public Int32 ReportID;
    public int TaskID;
    public Task ReportTask;
    public bool Active;
    public CancellationTokenSource CancelTokenSource;
    public ReportTasks(string name, Int32 reportID, int id, Task task, DataRow drReport, bool active, CancellationTokenSource canceltokensource, CancellationToken canceltoken)
    {
        ReportName = name;
        ReportID = reportID;
        TaskID = id;
        ReportTask = task;
        Active = active;
        CancelTokenSource = canceltokensource;
    }
}

希望这是足够的信息,任何帮助将不胜感激

更新: 我能够通过为任务的主要处理创建一个线程来解决这些问题。将功能从 tick 方法中移出,并将其放入由线程调用的自己的方法中。

因此,计时器的每个滴答声都会检查线程以查看它是否正在运行。

private static Thread mainThread = null;  

    private void timerReports_Tick(object sender, ElapsedEventArgs e)
    {
        //create thread if null
        if (mainThread == null)
        {
            mainThread = new Thread(new ThreadStart(Process));
        }

        //start thread if stopped or unstarted, else, it is still running and do nothing
        if(mainThread.ThreadState == System.Threading.ThreadState.Stopped || mainThread.ThreadState == System.Threading.ThreadState.Unstarted)
        {
            mainThread.Start();
        }
    }

更新:

上面的代码不起作用,因为我仍然遇到任务在未完成时重新触发的问题。

所以让我改写我的问题:我将如何创建动态任务/线程来触发从数据库控制任务/线程的相同功能/方法?如果记录被删除或设置为 Active = 0,它需要能够删除线程和任务,Active = 0 是 SQL 中的一个位字段,用于确定记录是否应该触发。

标签: c#.nettask

解决方案


“互相踩踏”是否意味着在前一个事件仍在处理时再次发生滴答事件?

在这种情况下,只需在滴答事件开始时禁用计时器并在处理完成时重新启用


推荐阅读