完成端口编程例子

xingyun86 2018-4-17 2204

服务器代码

#pragma hdrstop
//---------------------------------------------------------------------------
//#pragma argsused
#pragma comment(lib,"ws2_32.lib")
#include <stdio.h>
#include <memory.h>
#include <winsock2.h>
#include <iostream>
#include<iomanip> // for setfill setw
using namespace std;
#define RECV_POSTED 1001
#define SEND_POSTED 1002
int Init();
typedef struct _PER_HANDLE_DATA
{
	SOCKET sock;
}PER_HANDLE_DATA,* LPPER_HANDLE_DATA;
typedef struct _PER_IO_OPERATION_DATA
{
	OVERLAPPED Overlapped;
	WSABUF DataBuff;
	char Buff[24];
	BOOL OperationType;
}PER_IO_OPERATION_DATA,* LPPER_IO_OPERATION_DATA;
DWORD WINAPI ServerWorkerThread(LPVOID CompletionPort);
int main(int argc, char* argv[])
{
	LPPER_HANDLE_DATA perHandleData;
	LPPER_IO_OPERATION_DATA ioperdata;
	SYSTEM_INFO siSys;
	SOCKET sockListen;
	struct sockaddr_in addrLocal;
	//char buf[24];
	int nRet = 0;
	DWORD nThreadID;
	SOCKET sockAccept;
	DWORD dwFlags = 0;
	DWORD dwRecvBytes;
	int nReuseAddr = 1;
	cout<<"初始环境..."<<endl;
	if(Init() != 0)
		goto theend;
	//创建一个IO完成端口
	cout<<"创建一个IO完成端口"<<endl;
	HANDLE hCompletionPort;
	hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE,NULL,0,0);
	if(hCompletionPort == INVALID_HANDLE_VALUE)
	{
		cout<<"创建IO完成端口失败"<<endl;
		goto theend;
	}
	//获取CPU数目
	GetSystemInfo(&siSys);
	cout<<"CPU的核数为:"<<siSys.dwNumberOfProcessors<<endl;
	//创建一定数目的工作者线程,本例中以一个处理器一个线程搭配
	for(int i = 1;i<=(int)siSys.dwNumberOfProcessors*2;i++)//NumberOfProcessors
	{
		HANDLE hThread;
		hThread = CreateThread(NULL,0,ServerWorkerThread,(LPVOID)hCompletionPort,0,&nThreadID);
		cout<<"创建工作者线程"<<i<<endl;
		CloseHandle(hThread);
	}
	//创建监听SOCKET
	cout<<"创建监听SOCKET"<<endl;
	sockListen = WSASocket(AF_INET,SOCK_STREAM,0,NULL,0,WSA_FLAG_OVERLAPPED);
	if(sockListen == SOCKET_ERROR)
	{
		cout<<"WSASocket错误"<<endl;
		goto theend;
	}
	//设置socket端口为可重用
	if(setsockopt(sockListen,SOL_SOCKET,SO_REUSEADDR,(const char *)&nReuseAddr,sizeof(int)) != 0)
	{
		cout<<"setsockopt错误"<<endl;
		goto theend;
	}
	addrLocal.sin_family = AF_INET;
	addrLocal.sin_addr.s_addr = htonl(INADDR_ANY);
	addrLocal.sin_port = htons(9090);
	if(bind(sockListen,(struct sockaddr *)&addrLocal,sizeof(sockaddr_in)) != 0)
	{
		cout<<"bind错误"<<endl;
		int n = WSAGetLastError();
		goto theend;
	}
	//准备监听
	if(listen(sockListen,5)!=0)
	{
		cout<<"listen错误"<<endl;
		goto theend;
	}
	cout<<"Socket Listen 监听中..."<<endl;
	while(true)
	{
		//接收用户连接,被和完成端口关联
		sockAccept = WSAAccept(sockListen,NULL,NULL,NULL,0);
		perHandleData = (LPPER_HANDLE_DATA)malloc(sizeof(PER_HANDLE_DATA));
		if(perHandleData == NULL)
			continue;
		cout<<"Accept:socket number "<<sockAccept<<"接入"<<endl;
		perHandleData->sock = sockAccept;
		ioperdata = (LPPER_IO_OPERATION_DATA)malloc(sizeof(PER_IO_OPERATION_DATA));
		memset(&(ioperdata->Overlapped),0,sizeof(OVERLAPPED));
		ioperdata->DataBuff.len = 24;
		ioperdata->DataBuff.buf = ioperdata->Buff;
		ioperdata->OperationType = RECV_POSTED;
		if( ioperdata == NULL)
		{
			free(perHandleData);
			continue;
		}
		//关联
		//cout<<"关联SOCKET和完成端口..."<<endl;
		if(CreateIoCompletionPort((HANDLE)sockAccept,hCompletionPort,(DWORD)perHandleData,0) == NULL)
		{
			cout<<sockAccept<<"createiocompletionport错误"<<endl;
			free(perHandleData);
			free(ioperdata);
			continue;
		}
		////投递接收操作 必须有这句,否则 后面的数据 无法接受.
		////cout<<"投递接收操作"<<endl;
		if (SOCKET_ERROR == WSARecv(sockAccept,&(ioperdata->DataBuff),1,&dwRecvBytes,&dwFlags,&(ioperdata->Overlapped),NULL))
		{
			cout << "WSARecv ERROR:" << WSAGetLastError() << endl;
		}
	}
theend:
	getchar();
	return 0;
}
//---------------------------------------------------------------------------
int Init()
{
	WSAData wsaData;
	if(WSAStartup(MAKEWORD(2,2),&wsaData) != 0)
	{
		cout<<"WSAStartup失败"<<endl;
		return -1;
	}
	if(LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2)
	{
		cout<<"SOCKET版本不对"<<endl;
		WSACleanup();
		return -1;
	}
	return 0;
}
DWORD WINAPI ServerWorkerThread(LPVOID CompletionPort)
{
	HANDLE ComPort = (HANDLE)CompletionPort;
	DWORD BytesTransferred;
	//LPOVERLAPPED Overlapped;
	LPPER_HANDLE_DATA PerHandleData;
	LPPER_IO_OPERATION_DATA PerIoData;
	DWORD SendBytes,RecvBytes;
	DWORD Flags = 0;
	BOOL bT;
	while(TRUE)
	{
		//等待完成端口上SOCKET的完成
		cout<<"["<<GetCurrentProcessId()<<":"<<GetCurrentThreadId()<<"]"<<":等待完成端口上SOCKET的完成"<<endl;
		bT = GetQueuedCompletionStatus(ComPort,
			&BytesTransferred,(LPDWORD)&PerHandleData,
			(LPOVERLAPPED *)&PerIoData,INFINITE);
		//检查是否有错误产生
		if(BytesTransferred == 0 &&
			(PerIoData->OperationType == RECV_POSTED ||
			PerIoData->OperationType == SEND_POSTED))
		{
			//关闭SOCKET
			cout<<"["<<GetCurrentProcessId()<<":"<<GetCurrentThreadId()<<"]"<<PerHandleData->sock<<"SOCKET关闭"<<endl;
			closesocket(PerHandleData->sock);
			free(PerHandleData);
			free(PerIoData);
			continue;
		}
		//如果结束socket后做了其他事情,费时间,那么...要么优化其他事情,转移到其他地方去做.要么不适用IOCP.
		//Sleep(1000);
		//为请求服务
		if(PerIoData->OperationType == RECV_POSTED)
		{
			//处理
			cout<<"["<<GetCurrentProcessId()<<":"<<GetCurrentThreadId()<<"]"<<"接收处理"<<endl;
			cout<<"["<<GetCurrentProcessId()<<":"<<GetCurrentThreadId()<<"]"<<PerHandleData->sock<<" SOCKET :"<<PerIoData->Buff<<endl;
			//回应客户端
			ZeroMemory(PerIoData->Buff,24);
			//#define _CRT_SECURE_NO_WARNINGS 1
			//strcpy(PerIoData->Buff,"OK");
			strcpy_s(PerIoData->Buff,"OK");
			Flags = 0;
			ZeroMemory((LPVOID)&(PerIoData->Overlapped),sizeof(OVERLAPPED));
			PerIoData->DataBuff.len = 2;
			PerIoData->DataBuff.buf = PerIoData->Buff;
			PerIoData->OperationType = SEND_POSTED;
			WSASend(PerHandleData->sock,&(PerIoData->DataBuff),
				1,&SendBytes,0,&(PerIoData->Overlapped),NULL);
		}
		else //if(PerIoData->OperationType == SEND_POSTED)
		{
			//发送时的处理
			cout<<"["<<GetCurrentProcessId()<<":"<<setfill('0')<<setw(4)<<GetCurrentThreadId()<<"]"<<"发送处理"<<endl;
			Flags = 0;
			ZeroMemory((LPVOID)&(PerIoData->Overlapped),sizeof(OVERLAPPED));
			ZeroMemory(PerIoData->Buff,24);
			PerIoData->DataBuff.len = 24;
			PerIoData->DataBuff.buf = PerIoData->Buff;
			PerIoData->OperationType = RECV_POSTED;
			WSARecv(PerHandleData->sock,&(PerIoData->DataBuff),
				1,&RecvBytes,&Flags,&(PerIoData->Overlapped),NULL);
		}
	}
}

客户端代码

#pragma hdrstop
#include <winsock2.h>
#include <stdio.h>
#include <iostream>
#include<iomanip> // for setfill setw
#pragma comment(lib,"ws2_32.lib")
using namespace std;
//---------------------------------------------------------------------------
//#pragma argsused
int Init();
typedef struct SSocketClientRes{
    SOCKET sockClient;
    struct sockaddr_in addrServer;
    char buf[24];
    int n ;
}SSocketClientRes_t;
void ConnectToServer(SSocketClientRes_t &clientres,int theNumber){
    clientres.n=0;
    memset(clientres.buf,0,24);
    clientres.sockClient = socket(AF_INET,SOCK_STREAM,0);
    if(clientres.sockClient == INVALID_SOCKET)
    {
        cout<<setfill('0')<<setw(4)<<theNumber<<":"<<"socket 失败"<<endl;
        goto theEnd;
    }
    memset(&clientres.addrServer,0,sizeof(sockaddr_in));
    clientres.addrServer.sin_family = AF_INET;
    clientres.addrServer.sin_addr.s_addr = inet_addr("127.0.0.1");
    clientres.addrServer.sin_port = htons(9090);
    cout<<setfill('0')<<setw(4)<<theNumber<<":"<<"连接服务器..."<<endl;
    if(connect(clientres.sockClient,(const struct sockaddr *)&clientres.addrServer,sizeof(sockaddr)) != 0)
    {
        cout<<setfill('0')<<setw(4)<<theNumber<<":"<<"connect 失败"<<endl;
        goto theEnd;
    }
    char buf[6]="start";
    send(clientres.sockClient,buf,6,0);
    return ;
theEnd:
    WSACleanup();
    std::cout<<__FILE__<<":"<<__LINE__<<" 程序结束"<<std::endl;
    getchar();
}
void SendTestDataToServer(SSocketClientRes_t &clientres,int theNumber){
    memset(clientres.buf,0,24);
    sprintf(clientres.buf,"TestDataSend%d", clientres.n);
    cout<<setfill('0')<<setw(4)<<theNumber<<":"<<"发送:"<<clientres.buf<<endl;
    if(send(clientres.sockClient,clientres.buf,strlen(clientres.buf)+1,0) <= 0)
    {
        cout<<setfill('0')<<setw(4)<<theNumber<<":"<<"send失败,可能连接断开"<<endl;
        //break;
        goto theEnd;
    }
    memset(clientres.buf,0,24);
    //接收服务端应答
    if(recv(clientres.sockClient,clientres.buf,24,0) <= 0)
    {
        cout<<setfill('0')<<setw(4)<<theNumber<<":"<<"recv失败,可能连接断开"<<endl;
        //break;
        goto theEnd;
    }
    cout<<setfill('0')<<setw(4)<<theNumber<<":"<<"服务器应答:"<<clientres.buf<<endl;
    //Sleep(10);
    clientres.n++;
    return ;
theEnd:
    WSACleanup();
    std::cout<<__FILE__<<":"<<__LINE__<<" 程序结束"<<std::endl;
    getchar();
}
void CloseSocketConect(SSocketClientRes_t &clientres){
    closesocket(clientres.sockClient);
    shutdown(clientres.sockClient,2);
}
int main(int argc, char* argv[])
{
    if(Init() != 0)
        goto theEnd;
    SSocketClientRes_t client[6000];
    //链接服务器
    for(int i=0;i<6000;i++){
        ConnectToServer(client[i],i);
    }
    //发送数据
    cout<<"开始发送测试包"<<endl;
    for(int sendCount=0;sendCount=321000;sendCount++){
        for(int i=0;i<6000;i++){
            SendTestDataToServer(client[i],i);
        }
    }
    //关闭socket
    for(int i=0;i<6000;i++){
        CloseSocketConect(client[i]);
    }
theEnd:
    WSACleanup();
    std::cout<<__FILE__<<":"<<__LINE__<<" 程序结束"<<std::endl;
    getchar();
    return 0;
}
//---------------------------------------------------------------------------
int Init()
{
    WSAData wsaData;
    if(WSAStartup(MAKEWORD(2,2),&wsaData) != 0)
    {
        cout<<"WSAStartup失败"<<endl;
        return -1;
    }
    if(LOBYTE(wsaData.wVersion) != 2 || HIBYTE(wsaData.wVersion) != 2)
    {
        cout<<"SOCKET版本不对"<<endl;
        WSACleanup();
        return -1;
    }
    return 0;
}


×
打赏作者
最新回复 (0)
查看全部
全部楼主
返回