c - 如何使用WinAPI实现线程间的通信?
问题描述
我正在尝试使用 WinAPI 事件在线程之间进行通信。我有一个主线程和 2 个“工作”线程,它们会收到工作请求并在完成后报告。为简单起见,主线程总是向特定的工作线程发送请求;我为此使用专门的活动。
#include <stdio.h>
#include <inttypes.h>
#include "windows.h"
#include "process.h"
#define NUM_THREADS 2
struct mywork
{
int thread_index;
HANDLE event_do;
HANDLE event_done;
};
struct mywork gg_work[NUM_THREADS];
uintptr_t gg_threads[NUM_THREADS];
void my_iterate(void* lpParam)
{
while (1)
{
struct mywork* pwork = (struct mywork*)lpParam;
printf("Worker %d: wait for request\n", pwork->thread_index);
WaitForMultipleObjects(1, &pwork->event_do, TRUE, INFINITE);
printf("Worker %d: working\n", pwork->thread_index);
printf("Worker %d: ready\n", pwork->thread_index);
SetEvent(pwork->event_done);
}
}
int main()
{
for (int i = 0; i < NUM_THREADS; ++i)
{
gg_work[i].thread_index = i;
gg_work[i].event_do = CreateEvent(NULL, FALSE, FALSE, TEXT("event_do"));
gg_work[i].event_done = CreateEvent(NULL, FALSE, FALSE, TEXT("event_done"));
gg_threads[i] = _beginthread(my_iterate, 0, &gg_work[i]);
}
while (1)
{
for (int thread = 0; thread < NUM_THREADS; ++thread)
{
printf("Master: send request to %d\n", thread);
SetEvent(gg_work[thread].event_do);
}
for (int thread = 0; thread < NUM_THREADS; ++thread)
{
printf("Master: wait for worker %d\n", thread);
WaitForMultipleObjects(1, &gg_work[thread].event_done, TRUE, INFINITE);
printf("Master: got results from worker %d\n", thread);
}
}
}
我希望它可以无休止地运行,但是在来回发送一些事件后它总是会卡住。这是一个示例输出:
Master: send request to 0
Master: send request to 1
Master: wait for worker 0
Worker 1: wait for request
Worker 0: wait for request
Worker 1: working
Worker 1: ready
Worker 1: wait for request
Master: got results from worker 0
Master: wait for worker 1
这里,master 为 worker 0 设置事件;工人 0 等待它,但永远不会醒来。
它并不总是立即卡住 - 有时它会设法来回发送一些请求。但如果我只使用一个工作线程,它会无限运行(即成功)。
我究竟做错了什么?我应该使用不同的 WinAPI 函数实现线程之间的通信吗?如果是,为什么?
如果我通过忙于等待受互斥锁保护的布尔标志来替换SetEvent
/ ,它可以工作,但速度很慢。WaitFor...
我使用 Visual Studio 2017 编译我的代码;我将它与“多线程调试 DLL”运行时库链接。
解决方案
即使使用原始(按感觉)代码我也无法重现死锁,需要调试具体的二进制文件才能理解。
struct mywork
{
HANDLE event_do = 0, event_done = 0;
ULONG i, n;
~mywork()
{
if (event_do) CloseHandle(event_do);
if (event_done) CloseHandle(event_done);
}
};
ULONG WINAPI WorkThread(PVOID param)
{
ULONG i = reinterpret_cast<mywork*>(param)->i,
n = reinterpret_cast<mywork*>(param)->n;
HANDLE event_do = reinterpret_cast<mywork*>(param)->event_do,
event_done = reinterpret_cast<mywork*>(param)->event_done;
PSTR prefix = (PSTR)alloca(i+1);
memset(prefix, '\t', i);
prefix[i] = 0;
do
{
printf("%sWorker[%u]: wait (%u)\n", prefix, i, n);
WaitForSingleObject(event_do, INFINITE);
printf("%sWorker[%u]: .... (%u)\n", prefix, i, n);
printf("%sWorker[%u]: done (%u)\n", prefix, i, n);
SetEvent(event_done);
} while (--n);
printf("%sWorker[%u]: exit\n", prefix, i);
return 0;
}
void testm()
{
mywork my[NUM_THREADS], *pmy = my;
ULONG i = _countof(my), nThreads = 0, n = task_count;
do
{
pmy->i = i;
pmy->n = n;
if ((pmy->event_do = CreateEvent(0, 0, 0, 0)) &&
(pmy->event_done = CreateEvent(0, 0, 0, 0)))
{
if (HANDLE hThread = CreateThread(0, 0, WorkThread, pmy++, 0, 0))
{
CloseHandle(hThread);
nThreads++;
}
}
} while (--i);
if (nThreads)
{
do
{
pmy = my, i = nThreads;
do
{
printf("Master: signal [%u] (%u)\n", pmy->i, n);
SetEvent(pmy++->event_do);
} while (--i);
pmy = my, i = nThreads;
do
{
printf("Master: wait for [%u] (%u)\n", pmy->i, n);
WaitForSingleObject(pmy->event_done, INFINITE);
printf("Master: result from [%u] (%u)\n", pmy->i, n);
} while (pmy++, --i);
} while (--n);
printf("Master: exit\n");
}
}
但无论如何,这段代码包含严重的逻辑错误——在循环中为每个线程单独等待。这使得线程之间的依赖关系(比如说一个线程快速完成自我工作,但我们等待另一个线程很长时间。结果“快”线程也等待“慢”线程)
正确的解决方案 - 一次等待所有事件(但在这种情况下不能超过 64 个线程)。所以代码可以是下一个:
struct mywork
{
HANDLE event_do = 0, event_done = 0;
ULONG i, n;
~mywork()
{
if (event_do) CloseHandle(event_do);
if (event_done) CloseHandle(event_done);
}
};
ULONG WINAPI WorkThread(PVOID param)
{
ULONG i = reinterpret_cast<mywork*>(param)->i,
n = reinterpret_cast<mywork*>(param)->n;
HANDLE event_do = reinterpret_cast<mywork*>(param)->event_do,
event_done = reinterpret_cast<mywork*>(param)->event_done;
PSTR prefix = (PSTR)alloca(i+1);
memset(prefix, '\t', i);
prefix[i] = 0;
for(WaitForSingleObject(event_do, INFINITE);;)
{
printf("%sWorker[%u]: .... (%u)\n", prefix, i, n);
printf("%sWorker[%u]: done (%u)\n", prefix, i, n);
if (--n)
{
printf("%sWorker[%u]: wait (%u)\n", prefix, i, n);
SignalObjectAndWait(event_done, event_do, INFINITE, FALSE);
}
else
{
SetEvent(event_done);
break;
}
}
printf("%sWorker[%u]: exit\n", prefix, i);
return 0;
}
void testm()
{
mywork my[NUM_THREADS], *pmy = my;
ULONG i = _countof(my), nThreads = 0, n = 8;
do
{
pmy->i = i;
pmy->n = n;
if ((pmy->event_do = CreateEvent(0, 0, 0, 0)) &&
(pmy->event_done = CreateEvent(0, 0, 0, 0)))
{
if (HANDLE hThread = CreateThread(0, 0, WorkThread, pmy++, 0, 0))
{
CloseHandle(hThread);
nThreads++;
}
}
} while (--i);
if (nThreads)
{
HANDLE Events[_countof(my)], *pEvent = Events;
pmy = my, i = nThreads;
do
{
*pEvent++ = pmy->event_done;
printf("Master: signal [%u] (%u)\n", pmy->i, n);
SetEvent(pmy++->event_do);
} while (--i);
n = nThreads;
do
{
i = WaitForMultipleObjects(nThreads, Events, FALSE, INFINITE);
if (i < nThreads)
{
pmy = my + i;
printf("Master: result from [%u] (%u)\n", i, pmy->n);
if (--pmy->n)
{
printf("Master: signal [%u] (%u)\n", pmy->i, pmy->n);
SetEvent(pmy->event_do);
}
else
{
--n;
}
}
else
{
__debugbreak();
}
} while (n);
printf("Master: exit\n");
}
}
但无论如何,这还不够好。不需要创建event_done
事件。如果每个线程都有例如HWHD
运行消息循环和SendMessage
窗口的主线程句柄,那就更好了。在这种情况下甚至event_do
不需要,因为SendMessage
等待内部。也可以将 APC 发布到主线程。
带 APC 的代码:
struct mywork
{
HANDLE event_do = 0, hMainThread;
ULONG i, n, *pnThreads;
~mywork()
{
if (event_do) CloseHandle(event_do);
}
};
VOID NTAPI EventDone(_In_ mywork* pmy)
{
printf("Master: result from [%u] (%u)\n", pmy->i, pmy->n);
if (--pmy->n)
{
printf("Master: signal [%u] (%u)\n", pmy->i, pmy->n);
SetEvent(pmy->event_do);
}
else
{
--*pmy->pnThreads;
}
}
ULONG WINAPI WorkThread(PVOID param)
{
ULONG i = reinterpret_cast<mywork*>(param)->i,
n = reinterpret_cast<mywork*>(param)->n;
HANDLE event_do = reinterpret_cast<mywork*>(param)->event_do,
hMainThread = reinterpret_cast<mywork*>(param)->hMainThread;
PSTR prefix = (PSTR)alloca(i+1);
memset(prefix, '\t', i);
prefix[i] = 0;
do
{
printf("%sWorker[%u]: wait (%u)\n", prefix, i, n);
WaitForSingleObject(event_do, INFINITE);
printf("%sWorker[%u]: .... (%u)\n", prefix, i, n);
printf("%sWorker[%u]: done (%u)\n", prefix, i, n);
QueueUserAPC((PAPCFUNC)EventDone, hMainThread, (ULONG_PTR)param);
} while (--n);
printf("%sWorker[%u]: exit\n", prefix, i);
return 0;
}
void testm()
{
mywork my[NUM_THREADS], *pmy = my;
ULONG i = _countof(my), nThreads = 0, n = task_count;
if (HANDLE hMainThread = OpenThread(THREAD_SET_CONTEXT, FALSE, GetCurrentThreadId()))
{
do
{
pmy->i = i;
pmy->n = n;
pmy->hMainThread = hMainThread;
pmy->pnThreads = &nThreads;
if (pmy->event_do = CreateEvent(0, 0, 0, 0))
{
if (HANDLE hThread = CreateThread(0, 0, WorkThread, pmy++, 0, 0))
{
CloseHandle(hThread);
nThreads++;
}
}
} while (--i);
if (nThreads)
{
pmy = my, i = nThreads;
do
{
printf("Master: signal [%u] (%u)\n", pmy->i, n);
SetEvent(pmy++->event_do);
} while (--i);
while (STATUS_USER_APC == SleepEx(INFINITE, TRUE) && nThreads) continue;
}
CloseHandle(hMainThread);
}
printf("Master: exit\n");
}
带有 Windows 消息的代码:
struct mywork
{
HWND hwnd;
ULONG i, n;
enum { WM_DONE = WM_USER };
};
ULONG WINAPI WorkThread(PVOID param)
{
ULONG i = reinterpret_cast<mywork*>(param)->i,
n = reinterpret_cast<mywork*>(param)->n;
HWND hwnd = reinterpret_cast<mywork*>(param)->hwnd;
PSTR prefix = (PSTR)alloca(i+1);
memset(prefix, '\t', i);
prefix[i] = 0;
do
{
printf("%sWorker[%u]: .... (%u)\n", prefix, i, n);
printf("%sWorker[%u]: done and wait (%u)\n", prefix, i, n);
SendMessageW(hwnd, mywork::WM_DONE, 0, (LPARAM)param);
} while (--n);
printf("%sWorker[%u]: exit\n", prefix, i);
return 0;
}
LRESULT WINAPI WndProc(HWND hwnd, UINT uMsg, WPARAM wParam, LPARAM lParam)
{
switch (uMsg)
{
case WM_NCDESTROY:
PostQuitMessage(0);
break;
case WM_NCCREATE:
SetWindowLongPtrW(hwnd, GWLP_USERDATA,
(LONG_PTR)reinterpret_cast<CREATESTRUCT*>(lParam)->lpCreateParams);
break;
case mywork::WM_DONE:
mywork* pmy = (mywork*)lParam;
printf("Master: result from [%u] (%u)\n", pmy->i, pmy->n);
if (--pmy->n)
{
printf("Master: signal [%u] (%u)\n", pmy->i, pmy->n);
}
else
{
PULONG pnThreads = (PULONG)GetWindowLongPtrW(hwnd, GWLP_USERDATA);
if (!--*pnThreads)
{
DestroyWindow(hwnd);
}
break;
}
return 0;
}
return DefWindowProc(hwnd, uMsg, wParam, lParam);
}
void testm()
{
mywork my[NUM_THREADS], *pmy = my;
ULONG i = _countof(my), nThreads = 0, n = task_count;
WNDCLASS cls = { 0, WndProc, 0, 0, 0, 0, 0, 0, 0, L"my" };
if (RegisterClassW(&cls))
{
if (HWND hwnd = CreateWindowExW(0, cls.lpszClassName,
0, 0, 0, 0, 0, 0, HWND_MESSAGE, 0, 0, &nThreads))
{
do
{
pmy->i = i;
pmy->n = n;
pmy->hwnd = hwnd;
if (HANDLE hThread = CreateThread(0, 0, WorkThread, pmy++, 0, 0))
{
CloseHandle(hThread);
nThreads++;
}
} while (--i);
if (nThreads)
{
MSG msg;
while (0 < GetMessageW(&msg, 0, 0, 0))
{
DispatchMessageW(&msg);
}
}
}
UnregisterClassW(cls.lpszClassName, 0);
}
printf("Master: exit\n");
}
推荐阅读
- javascript - 如何使用 Angular 或 javascript 中的预签名 url 将文件上传到 S3 存储桶
- oracle - 如何解决 Microsoft TFS (Team Foundation Server) 更新问题
- java - 为什么对页面的限制不起作用
- google-chrome - 我可以将协作者添加到我的 chrome 扩展程序中吗?像多管理员一样的扩展
- git - 是否有 git 速记来引用我所在的当前分支?
- angular - Angular.io 代理配置
- azure - 在 Azure 中组合 Web 和移动应用时的身份验证
- python - 重采样后日期时间错误
- node.js - 从 Node.js 服务器上传媒体文件到 MongoDB
- jenkins - 使用 websocket 断开 Jenkins 代理