首页 > 技术文章 > ACE库udp发送和接收

nanzhi 2018-02-05 11:14 原文

udp双方由于不需要建立连接的过程,所以一个可以连续发送,一个连续接收。

由于接收不过来就会丢包,所以发送速度是接收速度的1/2来缓解接收压力(通过ACE_OS::sleep来实现)

一、发送端

头文件:

class CTcCommunicator
{
public:
CTcCommunicator();
virtual ~CTcCommunicator();


//strRemoteTp:"192.168.1.23"
int SendMessage(const MESSAGE& msg, string strRemoteTp);
int ReceiveMessage();

private:
ACE_INET_Addr m_adrLocal; //本地地址信息
ACE_SOCK_Dgram m_peer; //通信通道

 

};

源文件:

//因为这个类主要用来发送,所以ReceiveMessage()并没有实现

CTcCommunicator::CTcCommunicator():m_peer(m_adrLocal)
{

}

CTcCommunicator::~CTcCommunicator()
{
m_peer.close();
}

int CTcCommunicator::SendMessage( const MESSAGE& msg, string strRemoteTp)
{

//发送数据
char buf[2048];

memcpy(buf,&msg,sizeof(MESSAGE));

strRemoteTp += ":4000"; //端口号
ACE_INET_Addr tcAddr;
tcAddr.string_to_addr(strRemoteTp.c_str()); // 形参格式:“192.168.1.23:4000”

int nSendCount=m_peer.send(buf,sizeof(MESSAGE),tcAddr); //发送消息

return nSendCount;

}

int CTcCommunicator::ReceiveMessage()
{
return 0;
}

使用:

template <typename CONTAINER>
bool CHiAtmpVolumeTask::SendMsg(const CONTAINER& container, MSG msgType, const string& strRemoteIp)
{

CTcCommunicator tcCommunicator;
MESSAGE msg;
memset(&msg, 0, sizeof(MESSAGE));
msg.msg_header.nMsgType = msgType;

bool bRet = true;

CONTAINER::const_iterator iter = container.begin();
for(; iter != container.end(); iter++)
{
memset(msg.msg_body.bDataBuf, 0, sizeof(msg.msg_body.bDataBuf));
memcpy(msg.msg_body.bDataBuf, &(*iter), sizeof(*iter));
int nSendCount = tcCommunicator.SendMessage(msg, strRemoteIp);
if (nSendCount != -1)
{
ACE_OS::sleep(ACE_Time_Value(0,20000));  

}
else
{
bRet = false;

}

}

 return bRet;

}

 

二、接收端:

下面的代码并不好,没有封装成类。

接收端应该也像发送端一样封装成类,将

//ACE_INET_Addr port_to_listen(4000); //绑定的端口
//ACE_SOCK_Dgram peer(port_to_listen); //通信通道

定义为成员变量,在构造函数中初始化。

int CCommunication::ReceiveMsg(MESSAGE& msg, const ACE_SOCK_Dgram& peer)
{

//ACE_INET_Addr port_to_listen(4000); //绑定的端口
//ACE_SOCK_Dgram peer(port_to_listen); //通信通道

char buf[2048];
ACE_INET_Addr remoteAddr; //远程地址

memset(buf,0,sizeof(buf));
//int nReceiveCount = peer.recv(buf,2048,remoteAddr); //接收消息,获取远程地址信息
//int nReceiveCount=peer.recv(buf,2048,remoteAddr,0,&ACE_Time_Value(nServerOutTime));
int nReceiveCount=peer.recv(buf,2048,remoteAddr,0,&ACE_Time_Value(0));

if (nReceiveCount <= 0)
{
memset(&msg,0,sizeof(MESSAGE));

//原路发送应答消息

memcpy(buf,&answerMsg,sizeof(MESSAGE));
peer.send(buf,sizeof(MESSAGE),remoteAddr);


}

memcpy(&msg,buf,sizeof(MESSAGE));

return nReceiveCount;


}

使用:

在一个线程函数中,不断接收数据。

DWORD WINAPI CSSDlg::ReceiveBPMsgFunc( void* pParam )

{

CSSDlg* pDlg = (CDlg*)pParam;
HANDLE hEvent = pDlg->GetEventHandle();
CRITICAL_SECTION hcs = pDlg->GetCriticalSection();

ACE_INET_Addr port_to_listen(4000); //绑定的端口
ACE_SOCK_Dgram peer(port_to_listen); //通信通道

MESSAGE recvMsg; //收到的消息

while (TRUE)
{

int nReceiveCount = theApp.m_Comm.ReceiveMsgFromServer(recvMsg, peer);
if (nReceiveCount != -1)
{
ACE_OS::sleep(ACE_Time_Value(0,10000));

switch(recvMsg.msg_header.nMsgType)
{
case OUT:
{
ORDER stOrder;
memset(&stOrder, 0, sizeof(ORDER));
memcpy(&stOrder, recvMsg.msg_body.bDataBuf, sizeof(recvMsg.msg_body.bDataBuf));

EnterCriticalSection(&hcs);

do something;

LeaveCriticalSection(&hcs);

break;
}


default:
break;

}

}

if (WAIT_OBJECT_0 == WaitForSingleObject(hEvent, 0))
{
break;
}

}

return 0;

}

三、线程

头文件中的定义:

HANDLE m_hThread;   //m_hThread 初始化为 INVALID_HANDLE_VALUE
HANDLE m_hEvent;
CRITICAL_SECTION m_csOrderCode;

BOOL BeginReceiveMsgThread();
void EndReceiveMsgThread();
static DWORD WINAPI ReceiveBPMsgFunc(void* pParam);

源文件:

BOOL CSSDlg::BeginReceiveMsgThread()
{
/** 检测线程是否已经开启了 */
if (m_hThread != INVALID_HANDLE_VALUE)
{
/** 线程已经开启 */
return false;
}

//m_bExitThread = false;

//自动置位(第二个参数), 第三个参数:true表示已触发,false未触发
m_hEvent = CreateEvent (NULL,FALSE,FALSE,NULL); // 初始值为 nonsignaled ,并且每次触发后自动设置为nonsignaled
InitializeCriticalSection(&m_csOrderCode);

m_hThread = CreateThread(NULL,0, &CSSDlg::ReceiveBPMsgFunc, this, 0, NULL);

if (!m_hThread)
{
return false;
}
/** 设置线程的优先级,高于普通线程 */
/*if (!SetThreadPriority(m_hThread, THREAD_PRIORITY_ABOVE_NORMAL))
{
return false;
}
*/
return true;

}

void CSSDlg::EndReceiveMsgThread()
{
SetEvent(m_hEvent); //触发事件,设置为有信号状态,此时WaitForSingleObject返回WAIT_OBJECT_0,线程循环退出
Sleep(10);
DeleteCriticalSection(&m_csOrderCode);
CloseHandle(m_hEvent);
}

另外WaitForSingleObject可以作为循环条件:

while (WAIT_OBJECT_0 != WaitForSingleObject(hEvent, 1000))  //每隔1000毫秒读一次数据

while (WAIT_OBJECT_0 != WaitForSingleObject(hEvent, 1))  //每隔1000毫秒读一次数据

 

推荐阅读