服务器代码
#pragma hdrstop //--------------------------------------------------------------------------- //#pragma argsused #pragma comment(lib,"ws2_32.lib") #include <stdio.h> #include <memory.h> #include <winsock2.h> #include <iostream> #include<iomanip> // for setfill setw using namespace std; #define RECV_POSTED 1001 #define SEND_POSTED 1002 int Init(); typedef struct _PER_HANDLE_DATA { SOCKET sock; }PER_HANDLE_DATA,* LPPER_HANDLE_DATA; typedef struct _PER_IO_OPERATION_DATA { OVERLAPPED Overlapped; WSABUF DataBuff; char Buff[24]; BOOL OperationType; }PER_IO_OPERATION_DATA,* LPPER_IO_OPERATION_DATA; DWORD WINAPI ServerWorkerThread(LPVOID CompletionPort); int main(int argc, char* argv[]) { LPPER_HANDLE_DATA perHandleData; LPPER_IO_OPERATION_DATA ioperdata; SYSTEM_INFO siSys; SOCKET sockListen; struct sockaddr_in addrLocal; //char buf[24]; int nRet = 0; DWORD nThreadID; SOCKET sockAccept; DWORD dwFlags = 0; DWORD dwRecvBytes; int nReuseAddr = 1; cout<<"初始环境..."<<endl; if(Init() != 0) goto theend; //创建一个IO完成端口 cout<<"创建一个IO完成端口"<<endl; HANDLE hCompletionPort; hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0); if(hCompletionPort == INVALID_HANDLE_VALUE) { cout<<"创建IO完成端口失败"<<endl; goto theend; } //获取CPU数目 GetSystemInfo(&siSys); cout<<"CPU的核数为:"<<siSys.dwNumberOfProcessors<<endl; //创建一定数目的工作者线程,本例中以一个处理器一个线程搭配 for(int i = 1;i<=(int)siSys.dwNumberOfProcessors*2;i++)//NumberOfProcessors { HANDLE hThread; hThread = CreateThread(NULL,0,ServerWorkerThread,(LPVOID)hCompletionPort,0,&nThreadID); cout<<"创建工作者线程"<<i<<endl; CloseHandle(hThread); } //创建监听SOCKET cout<<"创建监听SOCKET"<<endl; sockListen = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED); if(sockListen == SOCKET_ERROR) { cout<<"WSASocket错误"<<endl; goto theend; } //设置socket端口为可重用 if(setsockopt(sockListen,SOL_SOCKET,SO_REUSEADDR,(const char *)&nReuseAddr,sizeof(int)) != 0) { cout<<"setsockopt错误"<<endl; goto theend; } addrLocal.sin_family = AF_INET; addrLocal.sin_addr.s_addr = htonl(INADDR_ANY); addrLocal.sin_port = htons(9090); if(bind(sockListen,(struct sockaddr *)&addrLocal,sizeof(sockaddr_in)) != 0) { cout<<"bind错误"<<endl; int n = WSAGetLastError(); goto theend; } //准备监听 if(listen(sockListen,5)!=0) { cout<<"listen错误"<<endl; goto theend; } cout<<"Socket Listen 监听中..."<<endl; while(true) { //接收用户连接,被和完成端口关联 sockAccept = WSAAccept(sockListen,NULL,NULL,NULL,0); perHandleData = (LPPER_HANDLE_DATA)malloc(sizeof(PER_HANDLE_DATA)); if(perHandleData == NULL) continue; cout<<"Accept:socket number "<<sockAccept<<"接入"<<endl; perHandleData->sock = sockAccept; ioperdata = (LPPER_IO_OPERATION_DATA)malloc(sizeof(PER_IO_OPERATION_DATA)); memset(&(ioperdata->Overlapped),0,sizeof(OVERLAPPED)); ioperdata->DataBuff.len = 24; ioperdata->DataBuff.buf = ioperdata->Buff; ioperdata->OperationType = RECV_POSTED; if( ioperdata == NULL) { free(perHandleData); continue; } //关联 //cout<<"关联SOCKET和完成端口..."<<endl; if(CreateIoCompletionPort((HANDLE)sockAccept,hCompletionPort,(DWORD)perHandleData,0) == NULL) { cout<<sockAccept<<"createiocompletionport错误"<<endl; free(perHandleData); free(ioperdata); continue; } ////投递接收操作 必须有这句,否则 后面的数据 无法接受. ////cout<<"投递接收操作"<<endl; if (SOCKET_ERROR == WSARecv(sockAccept,&(ioperdata->DataBuff),1,&dwRecvBytes,&dwFlags,&(ioperdata->Overlapped),NULL)) { cout << "WSARecv ERROR:" << WSAGetLastError() << endl; } } theend: getchar(); return 0; } //--------------------------------------------------------------------------- int Init() { WSAData wsaData; if(WSAStartup(MAKEWORD(2,2),&wsaData) != 0) { cout<<"WSAStartup失败"<<endl; return -1; } if(LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) { cout<<"SOCKET版本不对"<<endl; WSACleanup(); return -1; } return 0; } DWORD WINAPI ServerWorkerThread(LPVOID CompletionPort) { HANDLE ComPort = (HANDLE)CompletionPort; DWORD BytesTransferred; //LPOVERLAPPED Overlapped; LPPER_HANDLE_DATA PerHandleData; LPPER_IO_OPERATION_DATA PerIoData; DWORD SendBytes,RecvBytes; DWORD Flags = 0; BOOL bT; while(TRUE) { //等待完成端口上SOCKET的完成 cout<<"["<<GetCurrentProcessId()<<":"<<GetCurrentThreadId()<<"]"<<":等待完成端口上SOCKET的完成"<<endl; bT = GetQueuedCompletionStatus(ComPort, &BytesTransferred,(LPDWORD)&PerHandleData, (LPOVERLAPPED *)&PerIoData,INFINITE); //检查是否有错误产生 if(BytesTransferred == 0 && (PerIoData->OperationType == RECV_POSTED || PerIoData->OperationType == SEND_POSTED)) { //关闭SOCKET cout<<"["<<GetCurrentProcessId()<<":"<<GetCurrentThreadId()<<"]"<<PerHandleData->sock<<"SOCKET关闭"<<endl; closesocket(PerHandleData->sock); free(PerHandleData); free(PerIoData); continue; } //如果结束socket后做了其他事情,费时间,那么...要么优化其他事情,转移到其他地方去做.要么不适用IOCP. //Sleep(1000); //为请求服务 if(PerIoData->OperationType == RECV_POSTED) { //处理 cout<<"["<<GetCurrentProcessId()<<":"<<GetCurrentThreadId()<<"]"<<"接收处理"<<endl; cout<<"["<<GetCurrentProcessId()<<":"<<GetCurrentThreadId()<<"]"<<PerHandleData->sock<<" SOCKET :"<<PerIoData->Buff<<endl; //回应客户端 ZeroMemory(PerIoData->Buff,24); //#define _CRT_SECURE_NO_WARNINGS 1 //strcpy(PerIoData->Buff,"OK"); strcpy_s(PerIoData->Buff,"OK"); Flags = 0; ZeroMemory((LPVOID)&(PerIoData->Overlapped),sizeof(OVERLAPPED)); PerIoData->DataBuff.len = 2; PerIoData->DataBuff.buf = PerIoData->Buff; PerIoData->OperationType = SEND_POSTED; WSASend(PerHandleData->sock,&(PerIoData->DataBuff), 1,&SendBytes,0,&(PerIoData->Overlapped),NULL); } else //if(PerIoData->OperationType == SEND_POSTED) { //发送时的处理 cout<<"["<<GetCurrentProcessId()<<":"<<setfill('0')<<setw(4)<<GetCurrentThreadId()<<"]"<<"发送处理"<<endl; Flags = 0; ZeroMemory((LPVOID)&(PerIoData->Overlapped),sizeof(OVERLAPPED)); ZeroMemory(PerIoData->Buff,24); PerIoData->DataBuff.len = 24; PerIoData->DataBuff.buf = PerIoData->Buff; PerIoData->OperationType = RECV_POSTED; WSARecv(PerHandleData->sock,&(PerIoData->DataBuff), 1,&RecvBytes,&Flags,&(PerIoData->Overlapped),NULL); } } }
客户端代码
#pragma hdrstop #include <winsock2.h> #include <stdio.h> #include <iostream> #include<iomanip> // for setfill setw #pragma comment(lib,"ws2_32.lib") using namespace std; //--------------------------------------------------------------------------- //#pragma argsused int Init(); typedef struct SSocketClientRes{ SOCKET sockClient; struct sockaddr_in addrServer; char buf[24]; int n ; }SSocketClientRes_t; void ConnectToServer(SSocketClientRes_t &clientres,int theNumber){ clientres.n=0; memset(clientres.buf,0,24); clientres.sockClient = socket(AF_INET,SOCK_STREAM,0); if(clientres.sockClient == INVALID_SOCKET) { cout<<setfill('0')<<setw(4)<<theNumber<<":"<<"socket 失败"<<endl; goto theEnd; } memset(&clientres.addrServer,0,sizeof(sockaddr_in)); clientres.addrServer.sin_family = AF_INET; clientres.addrServer.sin_addr.s_addr = inet_addr("127.0.0.1"); clientres.addrServer.sin_port = htons(9090); cout<<setfill('0')<<setw(4)<<theNumber<<":"<<"连接服务器..."<<endl; if(connect(clientres.sockClient,(const struct sockaddr *)&clientres.addrServer,sizeof(sockaddr)) != 0) { cout<<setfill('0')<<setw(4)<<theNumber<<":"<<"connect 失败"<<endl; goto theEnd; } char buf[6]="start"; send(clientres.sockClient,buf,6,0); return ; theEnd: WSACleanup(); std::cout<<__FILE__<<":"<<__LINE__<<" 程序结束"<<std::endl; getchar(); } void SendTestDataToServer(SSocketClientRes_t &clientres,int theNumber){ memset(clientres.buf,0,24); sprintf(clientres.buf,"TestDataSend%d", clientres.n); cout<<setfill('0')<<setw(4)<<theNumber<<":"<<"发送:"<<clientres.buf<<endl; if(send(clientres.sockClient,clientres.buf,strlen(clientres.buf)+1,0) <= 0) { cout<<setfill('0')<<setw(4)<<theNumber<<":"<<"send失败,可能连接断开"<<endl; //break; goto theEnd; } memset(clientres.buf,0,24); //接收服务端应答 if(recv(clientres.sockClient,clientres.buf,24,0) <= 0) { cout<<setfill('0')<<setw(4)<<theNumber<<":"<<"recv失败,可能连接断开"<<endl; //break; goto theEnd; } cout<<setfill('0')<<setw(4)<<theNumber<<":"<<"服务器应答:"<<clientres.buf<<endl; //Sleep(10); clientres.n++; return ; theEnd: WSACleanup(); std::cout<<__FILE__<<":"<<__LINE__<<" 程序结束"<<std::endl; getchar(); } void CloseSocketConect(SSocketClientRes_t &clientres){ closesocket(clientres.sockClient); shutdown(clientres.sockClient,2); } int main(int argc, char* argv[]) { if(Init() != 0) goto theEnd; SSocketClientRes_t client[6000]; //链接服务器 for(int i=0;i<6000;i++){ ConnectToServer(client[i],i); } //发送数据 cout<<"开始发送测试包"<<endl; for(int sendCount=0;sendCount=321000;sendCount++){ for(int i=0;i<6000;i++){ SendTestDataToServer(client[i],i); } } //关闭socket for(int i=0;i<6000;i++){ CloseSocketConect(client[i]); } theEnd: WSACleanup(); std::cout<<__FILE__<<":"<<__LINE__<<" 程序结束"<<std::endl; getchar(); return 0; } //--------------------------------------------------------------------------- int Init() { WSAData wsaData; if(WSAStartup(MAKEWORD(2,2),&wsaData) != 0) { cout<<"WSAStartup失败"<<endl; return -1; } if(LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2) { cout<<"SOCKET版本不对"<<endl; WSACleanup(); return -1; } return 0; }
收藏的用户(0)
X
正在加载信息~
2
最新回复 (0)