首页 > 解决方案 > 在阻塞主线程中使用 TEvent 和 MsgWaitForMultipleObjects

问题描述

我发现了这个 Remy 的有趣代码。 Delphi:如何在本地创建和使用 Thread?

可以这样做,以便我可以执行多个线程并等到它们全部完成然后继续主线程吗?我试过这样但没有成功......

procedure Requery(DataList: TStringList);
var
  Event: TEvent;
  H: THandle;
  OpResult: array of Boolean;
  i: Integer;
begin
  Event := TEvent.Create;
  try
    SetLength(OpResult, DataList.Count); 
    for i:=0 to DataList.Count-1 do begin
      TThread.CreateAnonymousThread(
        procedure
        begin
          try
            // run query in thread
            OpResult[i]:=IsMyValueOK(DataList.Strings[i]);
          finally
            Event.SetEvent;
          end;
        end
      ).Start;
      H := Event.Handle;
    end;
    while MsgWaitForMultipleObjects(1, H, False, INFINITE, QS_ALLINPUT) = (WAIT_OBJECT_0+1) do Application.ProcessMessages;
    
    for i:=Low(OpResult) to High(OpResult) do begin
      Memo1.Lines.Add('Value is: ' + BoolToStr(OpResult[i], True));
    end;
  finally
    Event.Free;
  end;

  // Do next jobs with query
  ...
end;

标签: multithreadingdelphieventsblockingtthread

解决方案


可以这样做,以便我可以执行多个线程并等待它们全部完成

是的。您只需要创建多个TEvent对象,每个对象一个TThread,然后将它们的所有Handles 存储在一个数组中以传递给MsgWaitForMultipleObjects()

procedure Requery(DataList: TStringList);
var
  Events: array of TEvent;
  H: array of THandle;
  OpResult: array of Boolean;
  i: Integer;
  Ret, Count: DWORD;

  // moved into a helper function so that the anonymous procedure
  // can capture the correct Index...
  procedure StartThread(Index: integer);
  begin
    Events[Index] := TEvent.Create;
    TThread.CreateAnonymousThread(
      procedure
      begin
        try
          // run query in thread
          OpResult[Index] := IsMyValueOK(DataList.Strings[Index]);
        finally
          Events[Index].SetEvent;
        end;
      end
    ).Start;
    H[Index] := Events[Index].Handle;
  end;

begin
  if DataList.Count > 0 then
  begin
    SetLength(Events, DataList.Count);
    SetLength(H, DataList.Count);
    SetLength(OpResult, DataList.Count);

    try
      for i := 0 to DataList.Count-1 do begin
        StartThread(i);
      end;

      Count := Length(H);
      repeat
        Ret := MsgWaitForMultipleObjects(Count, H[0], False, INFINITE, QS_ALLINPUT);
        if Ret = WAIT_FAILED then RaiseLastOSError;
        if Ret = (WAIT_OBJECT_0+Count) then
        begin
          Application.ProcessMessages;
          Continue;
        end;
        for i := Integer(Ret-WAIT_OBJECT_0)+1 to High(H) do begin
          H[i-1] := H[i];
        end;
        Dec(Count);
      until Count = 0;

      for i := Low(OpResult) to High(OpResult) do begin
        Memo1.Lines.Add('Value is: ' + BoolToStr(OpResult[i], True));
      end;
    finally
      for i := Low(Events) to High(Events) do begin
        Events[i].Free;
      end;
    end;
  end;

  // Do next jobs with query
  ...
end;

话虽如此,您也可以选择摆脱TEvent对象并等待TThread.Handles 代替。当Handle线程完全终止时,线程会发出等待操作的信号。唯一的问题是TThread.CreateAnonymousThread()创建了一个TThreadFreeOnTerminate属性是的True,所以你必须手动关闭它:

procedure Requery(DataList: TStringList);
var
  Threads: array of TThread;
  H: array of THandle;
  OpResult: array of Boolean;
  i: Integer;
  Ret, Count: DWORD;

  // moved into a helper function so that the anonymous procedure
  // can capture the correct Index...
  procedure StartThread(Index: integer);
  begin
    Threads[Index] := TThread.CreateAnonymousThread(
      procedure
      begin
        // run query in thread
        OpResult[Index] := IsMyValueOK(DataList.Strings[Index]);
      end
    );
    Threads[Index].FreeOnTerminate := False;
    H[Index] := Threads[Index].Handle;
    Threads[Index].Start;
  end;

begin
  try
    SetLength(Threads, DataList.Count);
    SetLength(H, DataList.Count);
    SetLength(OpResult, DataList.Count);

    for i := 0 to DataList.Count-1 do begin
      StartThread(i);
    end;

    Count := Length(H);
    repeat
      Ret := MsgWaitForMultipleObjects(Count, H[0], False, INFINITE, QS_ALLINPUT);
      if Ret = WAIT_FAILED then RaiseLastOSError;
      if Ret = (WAIT_OBJECT_0+Count) then
      begin
        Application.ProcessMessages;
        Continue;
      end;
      for i := Integer(Ret-WAIT_OBJECT_0)+1 to High(H) do begin
        H[i-1] := H[i];
      end;
      Dec(Count);
    until Count = 0;

    for i := Low(OpResult) to High(OpResult) do begin
      Memo1.Lines.Add('Value is: ' + BoolToStr(OpResult[i], True));
    end;
  finally
    for i := Low(Threads) to High(Threads) do begin
      Threads[i].Free;
    end;
  end;

  // Do next jobs with query
  ...
end;

无论哪种方式,请注意MsgWaitForMultipleObjects()一次最多只能等待 63 ( MAXIMUM_WAIT_OBJECTS[64] - 1) 个句柄。如果您需要,该WaitForMultipleObjects()文档解释了如何解决该限制:

要等待超过 MAXIMUM_WAIT_OBJECTS 个句柄,请使用以下方法之一:

  • 创建一个线程以等待 MAXIMUM_WAIT_OBJECTS 句柄,然后在该线程和其他句柄上等待。使用此技术将句柄分成 MAXIMUM_WAIT_OBJECTS 组。
  • 调用 RegisterWaitForSingleObject 以等待每个句柄。线程池中的等待线程等待 MAXIMUM_WAIT_OBJECTS 注册对象,并在对象发出信号或超时间隔到期后分配一个工作线程。

或者,您可以简单地以较小的批次处理您的列表,例如一次不超过 50-60 个项目。


推荐阅读