首页 > 技术文章 > 阻塞与非阻塞那些事

91Kesson 2014-09-17 10:44 原文

日常开发中,经常碰到处理程序阻塞的情况:

1.select函数

select函数用于确定一个或多个套接口的状态,对每一个套接口,调用者可查询它的可读性、可写性及错误状态信息,用fd_set结构来表示一组等待检查的套接口,在调用返回时,这个结构存有满足一定条件的套接口组的子集,并且select()返回满足条件的套接口的数目。有一组宏可用于对fd_set的操作,这些宏与Berkeley Unix软件中的兼容,但内部的表达是完全不同的。

头文件:

#include <winsock.h>

原型:

int PASCAL FAR select( 
        int nfds,                //整数值,是指集合中所有文件描述符的范围,即所有文件描述符的最大值加1;windows中可省略;
        fd_set FAR* readfds,     //(可选)指向一组等待可读性检查的套接口;
        fd_set FAR* writefds,    //(可选)指向一组等待可写性检查的套接口;
        fd_set FAR* exceptfds, //(可选)指向一组等待错误检查的套接口;
        const struct timeval FAR* timeout    //select()最多等待时间,对阻塞操作则为NULL;
        );

参数解释:

readfds:标识等待可读性检查的套接口
如果该套接口正处于监听listen()状态,则若有连接请求到达,该套接口便被标识为可读,这样一个accept()调用保证可以无阻塞完成;
对其他套接口而言,可读性意味着有排队数据供读取。对于SOCK_STREAM类型套接口,便是recv()或recvfrom()操作均能无阻塞完成;

writefds:标识等待可写性检查的套接口
如果一个套接口正在connect()连接(非阻塞),可写性意味着连接顺利建立。
如果套接口并未处于connect()调用中,可写性意味着send()和sendto()调用将无阻塞完成。

exceptfds:标识等待带外数据存在性或意味错误条件检查的套接口

timeout:控制select完成的时间 若为空指针,则select将一直阻塞到有一个描述字满足条件; 若为{0,0},则select立即返回; 否则,指向一个timeval结构,其中指定了select调用在返回前等待多长时间;

MARK:

在winsock.h头文件中共定义了四个宏来操作描述字集。FD_SETSIZE变量用于确定一个集合中最多有多少描述字(FD_SETSIZE缺省值为64,可在包含winsock.h前用#define FD_SETSIZE来改变该值)。对于内部表示,fd_set被表示成一个套接口的队列,最后一个有效元素的后续元素为INVAL_SOCKET。
1、FD_CLR(s,*set):从集合set中删除描述字s。
2、FD_ISSET(s,*set):若s为集合中一员,非零;否则为零。
3、FD_SET(s,*set):向集合添加描述字s。
4、FD_ZERO(*set):将set初始化为空集NULL。

示例代码:

/////////////////////////////////////////////////////////////////////////
/*
创建两个socket,循环检测socket有请求到来时,分别进行处理;
*/
int _tmain(int argc, _TCHAR* argv[])       
{

    struct sockaddr_in servaddr1, clientaddr1;
    struct sockaddr_in servaddr2, clientaddr2;
    int listenfd1, connfd1 = 0;
    int listenfd2, connfd2 = 0;
    int clientlen = 0;
    WSADATA wsaData;
    int ServerPort1;
    int ServerPort2;
    HANDLE hThread;

    fd_set set;
    int maxfd;

    if(WSAStartup( MAKEWORD( 2, 2 ), &wsaData ))
    {
        printf("WSAStartup Error\n");
        return 1;
    }
    servaddr1.sin_family = AF_INET;
    servaddr1.sin_port = htons(PORT1);
    servaddr1.sin_addr.s_addr = INADDR_ANY;
    servaddr2.sin_family = AF_INET;
    servaddr2.sin_port = htons(PORT2);
    servaddr2.sin_addr.s_addr = INADDR_ANY;
    clientlen = sizeof(struct sockaddr);

    listenfd1 = socket(AF_INET, SOCK_STREAM, 0);
    listenfd2 = socket(AF_INET, SOCK_STREAM, 0);
    if((listenfd1<0) || (listenfd2<0))
    {
        printf("listenfd Error\n");
        return 1;
    }

    if( bind(listenfd1, (struct sockaddr *)&servaddr1, sizeof(servaddr1)) < 0 
        ||bind(listenfd2, (struct sockaddr *)&servaddr2, sizeof(servaddr2)) < 0)
    {
        printf("Server Bind Port Error\n");
        return 1;
    }
    
    if( listen(listenfd1, 5) < 0
        ||listen(listenfd2, 5) < 0) 
    {
        printf("listen Error\n");
        return 1;
    }


    maxfd = __max(listenfd1, listenfd2);
    for(;;)
    {
        FD_ZERO(&set);
        FD_SET((unsigned int)listenfd1, &set);
        FD_SET((unsigned int)listenfd2, &set);
        if( select(maxfd+1, &set, NULL, NULL, 0) < 0 ) 
        {
            printf("select errorErrorcode :%d ",WSAGetLastError());
            return 1;
        }
        if( FD_ISSET(listenfd1, &set) )//port1有数据到来
        {
            connfd1 = accept( listenfd1, (struct sockaddr *)&clientaddr1, &clientlen );
            if(connfd1 < 0)
            {
                printf("accept error.errcode:%d\n",WSAGetLastError());
                continue;
            }

            hThread = CreateThread( 
                NULL,                        
                0,                   
                (LPTHREAD_START_ROUTINE)DoProxy1,      
                NULL,             
                0,                
                NULL);
        }

        if( FD_ISSET(listenfd2, &set) )//port2有数据到来
        {
            connfd2 = accept( listenfd2, (struct sockaddr *)&clientaddr2, &clientlen );
            if(connfd2 < 0)
            {
                printf("accept error.errcode:%d\n",WSAGetLastError());
                continue;
            }

            hThread = CreateThread( 
                NULL,                    
                0,                       
                (LPTHREAD_START_ROUTINE)DoProxy2,          
                &parm,                          
                0,                           
                NULL);
        }    

    }//end for
    return 0;
}
View Code

 

2、PeekNamedPipe函数

从命名管道/匿名管道中拷贝数据到一个指定缓冲区,原管道中的数据任保留;

原型:

BOOL WINAPI PeekNamedPipe(
    __in       HANDLE hNamedPipe,    //命名\匿名管道 句柄;
    __out_opt  LPVOID lpBuffer,        //接收从管道中读取的数据,可以为空;
    __in       DWORD nBufferSize,    //指定lpBuffer的大小;
    __out_opt  LPDWORD lpBytesRead,    //实际接收的数据大小;
    __out_opt  LPDWORD lpTotalBytesAvail,        //管道中所有可读数据的大小;
    __out_opt  LPDWORD lpBytesLeftThisMessage    //当前消息中剩余的字节数;
    );


示例代码:

 1 //////////////////////////////////////////////////////////////////////////
 2 //获取目标控制台程序的输出,并显示在编辑框中;
 3 //实际运行发现目标控制台进程结束后,才会将数据一次性写入管道中;
 4 //才能从读取到数据,否则PeekNamedPipe任会返回1,但lpTotalBytesAvail却为0;
 5 //可以在控制台程序中每次printf后添加fflush(STDOUT);解决;
 6 /////////////////////////////////////////////////////////////////////////
 7  int CEntreVisionDlg::Start(CEntreVisionDlg *pDlg)
 8 {
 9     SECURITY_ATTRIBUTES sa;
10     HANDLE hRead, hWrite;
11     sa.nLength = sizeof(SECURITY_ATTRIBUTES);
12     sa.lpSecurityDescriptor = NULL;
13     sa.bInheritHandle = TRUE;
14 
15     if(!CreatePipe(&hRead, &hWrite, &sa, 0))
16     {
17         CloseHandle(hRead);
18         CloseHandle(hWrite);
19         return 0;
20     }
21 
22     STARTUPINFO si;
23     PROCESS_INFORMATION pi;
24     si.cb = sizeof(STARTUPINFO);
25     GetStartupInfo(&si);
26     si.hStdOutput = hWrite;
27     si.hStdOutput = hWrite;
28     si.wShowWindow = SW_HIDE;
29     si.dwFlags = STARTF_USESHOWWINDOW |  STARTF_USESTDHANDLES;
30 
31     if (!CreateProcess(NULL, "test.exe", 
32         NULL, NULL, TRUE, NULL, NULL, NULL, &si, &pi))
33     {
34         CloseHandle(hRead);
35         CloseHandle(hWrite);
36         return 0;
37     }
38     g_hProcess = pi.hProcess;
39 
40     char buffer[512] = {0};
41     DWORD bytesRead = 0, dwTotalArrival = 0;
42     BOOL bRet = FALSE;
43 
44     while(1)
45     {
46         while(pDlg->m_bListen)
47         {
48             dwTotalArrival = -1;
49             bRet = PeekNamedPipe(hRead, NULL, 0, NULL, &dwTotalArrival, NULL);
50             if (bRet && (dwTotalArrival > 0))
51             {
52                 bRet = ReadFile(hRead, buffer, 512, &bytesRead, NULL);
53                 pDlg->m_edit.SetSel(-1);
54                 pDlg->m_edit.ReplaceSel(buffer);
55                 /*pDlg->SetDlgItemText(IDC_EDIT1, buffer);*/
56                 memset(buffer, 0, 512);
57             }
58     
59         }
60         
61     }
62 
63     return 1;
64 }
PeekNamedPipe

 3、WaitForSingleObject 与 WaitForMultipleObjects函数

  3.1 WaitForSingleObject

 等待直到指定的对象处于有信号状态或超时;

 原型: 

DWORD WINAPI WaitForSingleObject(
    __in  HANDLE hHandle,        //被等待的对象句柄;
    __in  DWORD dwMilliseconds    //超时时间;
    );

参数:

hHandle:被等待的对象句柄 被等待的对象类型可以是更改通知(Change notification)、控制台输入(Console input)、事件(Event)、内存资源通知(Memory resource notification)、  互斥体(Mutex)、进程(Process)、信号量(Semaphore)、线程(Thread)、可等待计时器(Waitable timer);

dwMilliseconds:超时时间,可以设置为以下值  为0,函数不进入等待状态,立即返回;  INFINITE,一直等待直到被等待对象变为有信号状态时才返回;  其它DWORD值,函数等待直到被等待对象变为有信号或者等待时间间隔到期;

返回值:

指示导致函数从等待状态返回的原因,可以是以下值:
 WAIT_ABANDONED (0x00000080L) 指定对象是一个互斥体,拥有该互斥体的线程在终止时没有释放该互斥体对象,此时该互斥体对象的所有权被授予给调用线程,并被设置为无信号状态;
 WAIT_OBJECT_0 (0x00000000L) 指定对象变为有信号状态;
 WAIT_TIMEOUT (0x00000102L) 等待时间耗尽且指定对象仍为无信号状态;
 WAIT_FAILED  (0xFFFFFFFF) 函数运行不成功,调用CallLastError获得错误信息;

示例代码:

 1 #include <windows.h>
 2 #include <stdio.h>
 3 
 4 #define THREADCOUNT 2
 5 
 6 HANDLE ghMutex; 
 7 
 8 DWORD WINAPI WriteToDatabase( LPVOID );
 9 
10 void main()
11 {
12     HANDLE aThread[THREADCOUNT];
13     DWORD ThreadID;
14     int i;
15 
16     // Create a mutex with no initial owner
17 
18     ghMutex = CreateMutex( 
19         NULL,              // default security attributes
20         FALSE,             // initially not owned
21         NULL);             // unnamed mutex
22 
23     if (ghMutex == NULL) 
24     {
25         printf("CreateMutex error: %d\n", GetLastError());
26         return;
27     }
28 
29     // Create worker threads
30 
31     for( i=0; i < THREADCOUNT; i++ )
32     {
33         aThread[i] = CreateThread( 
34                      NULL,       // default security attributes
35                      0,          // default stack size
36                      (LPTHREAD_START_ROUTINE) WriteToDatabase, 
37                      NULL,       // no thread function arguments
38                      0,          // default creation flags
39                      &ThreadID); // receive thread identifier
40 
41         if( aThread[i] == NULL )
42         {
43             printf("CreateThread error: %d\n", GetLastError());
44             return;
45         }
46     }
47 
48     // Wait for all threads to terminate
49 
50     WaitForMultipleObjects(THREADCOUNT, aThread, TRUE, INFINITE);
51 
52     // Close thread and mutex handles
53 
54     for( i=0; i < THREADCOUNT; i++ )
55         CloseHandle(aThread[i]);
56 
57     CloseHandle(ghMutex);
58 }
59 
60 DWORD WINAPI WriteToDatabase( LPVOID lpParam )
61 { 
62     DWORD dwCount=0, dwWaitResult; 
63 
64     // Request ownership of mutex.
65 
66     while( dwCount < 20 )
67     { 
68         dwWaitResult = WaitForSingleObject( 
69             ghMutex,    // handle to mutex
70             INFINITE);  // no time-out interval
71  
72         switch (dwWaitResult) 
73         {
74             // The thread got ownership of the mutex
75             case WAIT_OBJECT_0: 
76                 __try { 
77                     // TODO: Write to the database
78                     printf("Thread %d writing to database...\n", 
79                             GetCurrentThreadId());
80                     dwCount++;
81                 } 
82 
83                 __finally { 
84                     // Release ownership of the mutex object
85                     if (! ReleaseMutex(ghMutex)) 
86                     { 
87                         // Handle error.
88                     } 
89                 } 
90                 break; 
91 
92             // The thread got ownership of an abandoned mutex
93             // The database is in an indeterminate state
94             case WAIT_ABANDONED: 
95                 return FALSE; 
96         }
97     }
98     return TRUE; 
99 }
WaitForSingleObject

 

3.2 WaitForMultipleObjects

等待直到一个或所有指定的对象处于有信号状态或超时;

原型:

DWORD WINAPI WaitForMultipleObjects(
    __in  DWORD nCount,                //表示lpHandles指向数组中对象句柄的数量,最大值为 MAXIMUM_WAIT_OBJECTS;
    __in  const HANDLE *lpHandles,    //指向包含一组句柄对象的数组;                            
    __in  BOOL bWaitAll,            //为TRUE,表示数组lpHandles中所有对象变为有信号状态才返回;为FALSE,表示任何一个对象变为有信号状态即返回;
    __in  DWORD dwMilliseconds        //意义通WaitForSingleObject;
    );

参数:

lpHandles:
   数组中不可以包含相同的对象句柄;
   如果函数为返回之前,数组中便有一个或多个句柄被释放,此时函数的行为未被定义,是不可被判断的;
   数组中句柄必需都有SYNCHRONIZE 权限;

返回值:

WAIT_OBJECT_0 to (WAIT_OBJECT_0 + nCount– 1): 如果bWaitAll是TRUE,表示所有等待对象句柄均变为有信号;如果bWaitAll设为FALSE,返回值减WAIT_OBJECT_0的值便是lpHanddles数组中满足函数返回的对象的下标(若同时有多个对象变为了有有信号状态,返回值减WAIT_OBJECT_0的值 表示的是变为有信号状态对象中最小的下标);

WAIT_ABANDONED_0 to (WAIT_ABANDONED_0 + nCount– 1):如果bWaitAll是TRUE,表示所有等待对象句柄均变为有信号,且至少有一个对象是被抛弃的互斥体对象;如果bWaitAll设为FALSE,返回值减WAIT_OBJECT_0的值 便是lpHanddles数组中被抛弃的互斥体对象; (互斥体对象的被抛弃的处理,同WaitForSingleObject中WAIT_ABANDONED返回值)。

WAIT_TIMEOUT 和 WAIT_FAILED 同 WaitForSingleObject 中对应返回值;

示例代码:

 1 #include <windows.h>
 2 #include <stdio.h>
 3 
 4 HANDLE ghEvents[2];
 5 
 6 DWORD WINAPI ThreadProc( LPVOID );
 7 
 8 void main()
 9 {
10     HANDLE hThread; 
11     DWORD i, dwEvent, dwThreadID; 
12 
13     // Create two event objects
14 
15     for (i = 0; i < 2; i++) 
16     { 
17         ghEvents[i] = CreateEvent( 
18             NULL,   // default security attributes
19             FALSE,  // auto-reset event object
20             FALSE,  // initial state is nonsignaled
21             NULL);  // unnamed object
22 
23         if (ghEvents[i] == NULL) 
24         { 
25             printf("CreateEvent error: %d\n", GetLastError() ); 
26             ExitProcess(0); 
27         } 
28     } 
29 
30     // Create a thread
31 
32     hThread = CreateThread( 
33                  NULL,         // default security attributes
34                  0,            // default stack size
35                  (LPTHREAD_START_ROUTINE) ThreadProc, 
36                  NULL,         // no thread function arguments
37                  0,            // default creation flags
38                  &dwThreadID); // receive thread identifier
39 
40     if( hThread == NULL )
41     {
42         printf("CreateThread error: %d\n", GetLastError());
43         return;
44     }
45 
46     // Wait for the thread to signal one of the event objects
47 
48     dwEvent = WaitForMultipleObjects( 
49         2,           // number of objects in array
50         ghEvents,     // array of objects
51         FALSE,       // wait for any object
52         5000);       // five-second wait
53 
54     // The return value indicates which event is signaled
55 
56     switch (dwEvent) 
57     { 
58         // ghEvents[0] was signaled
59         case WAIT_OBJECT_0 + 0: 
60             // TODO: Perform tasks required by this event
61             printf("First event was signaled.\n");
62             break; 
63 
64         // ghEvents[1] was signaled
65         case WAIT_OBJECT_0 + 1: 
66             // TODO: Perform tasks required by this event
67             printf("Second event was signaled.\n");
68             break; 
69 
70         case WAIT_TIMEOUT:
71             printf("Wait timed out.\n");
72             break;
73 
74         // Return value is invalid.
75         default: 
76             printf("Wait error: %d\n", GetLastError()); 
77             ExitProcess(0); 
78     }
79 
80     // Close event handles
81 
82     for (i = 0; i < 2; i++) 
83         CloseHandle(ghEvents[i]);    
84 }
85 
86 DWORD WINAPI ThreadProc( LPVOID lpParam )
87 {
88     // Set one event to the signaled state
89 
90     if ( !SetEvent(ghEvents[0]) ) 
91     {
92         printf("SetEvent failed (%d)\n", GetLastError());
93         return -1;
94     }
95     return 1;
96 }
WaitForMultipleObjects

 

推荐阅读