| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362 | #pragma once #include <map>#include <list>#include "ITCPServer.h"#include <list>using namespace std;//#define TCPIOCP_MANAGER _T("TcpServer监视")#define UPDATE_ONE_DATA WM_USER + 1001//#define ADD_TCPIOCP WM_USER + 1002//#define ADD_CONNECT_TCPTOCP WM_USER + 1003//#define UPDATE_ONE_DATA_JDSP_RECV WM_USER + 1004//#define UPDATE_ONE_DATA_JDSP_SEND WM_USER + 1005class CIOCP{public:	//CIOCP构造函数	CIOCP(int nMaxConcurrent = -1)	{		m_hIOCP = NULL;		if (nMaxConcurrent != -1)		{			Create(nMaxConcurrent);		}	}	//CIOCP析构函数	~CIOCP()	{		if (m_hIOCP != NULL)		{			CloseHandle(m_hIOCP);		}	}	//关闭IOCP	BOOL Close()	{		BOOL bResult = CloseHandle(m_hIOCP);		m_hIOCP = NULL;		return bResult;	}	//创建IOCP, nMaxConcurrency指定最大线程并发数量, 0 默认为CPU核数	bool Create(int nMaxConcurrency = 0)	{		m_hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, nMaxConcurrency);		//ASSERT(m_hIOCP != NULL);		return (m_hIOCP != NULL);	}	//为设备(文件,socket,邮件槽,管道等)关联一个IOCP	bool AssociateDevice(HANDLE hDevice, ULONG_PTR Compkey)	{		bool fOk = (CreateIoCompletionPort(hDevice, m_hIOCP, Compkey, 0) == m_hIOCP);		return fOk;	}	//为Socket关联一个IOCP	bool AssociateSocket(SOCKET hSocket, ULONG_PTR CompKey)	{		return AssociateDevice((HANDLE)hSocket, CompKey);	}	//为IOCP传递事件通知	bool PostStatus(ULONG_PTR CompKey, DWORD dwNumBytes = 0, OVERLAPPED* po = NULL)	{		bool fOk = PostQueuedCompletionStatus(m_hIOCP, dwNumBytes, CompKey, po);		return fOk;	}	//从IO完成端口队列中获取事件通知, IO完成队列中没有事件时,该函数阻塞	bool GetStatus(ULONG_PTR* pCompKey, PDWORD pdwNumBytes, OVERLAPPED** ppo, DWORD dwMillseconds = INFINITE)	{		return GetQueuedCompletionStatus(m_hIOCP, pdwNumBytes, pCompKey, ppo, dwMillseconds);	}	//获取IOCP对象	const HANDLE GetIOCP()	{		return m_hIOCP;	}private:	HANDLE m_hIOCP= INVALID_HANDLE_VALUE;   //IOCP句柄};/*在IOCP编程模型中,需要用到GetQueuedCompletionStatus()函数获取已完成的事件但是该函数的返回参数无 Socket或者buffer的描述信息一个简单的方法是:创建一个新的结构,该结构第一个参数是OVERLAPPED由于AcceptEx, WSASend 等重叠IO操作传入的是Overlapped结构体的地址,调用AcceptEx等重叠IO操作,在Overlapped结构体后面开辟新的空间写入socket或者buffer的信息,即可将socket或者buffer的信息由GetQueuedComletionStatus带回*/#define MAXBUF 8*1024enum IOOperType{	TYPE_ACP,					//accept事件到达,有新的连接请求	TYPE_RECV,				   //数据接收事件	TYPE_SEND,               //数据发送事件	TYPE_CLOSE_SOCK,			//服务端主动关闭socket	TYPE_CLOSE,             //关闭事件	TYPE_NO_OPER};class COverlappedIOInfo : public OVERLAPPED{public:	COverlappedIOInfo(void)	{		m_socket = INVALID_SOCKET;		ResetOverlapped();		ResetRecvBuffer();		ResetSendBuffer();	}	~COverlappedIOInfo(void)	{		if (m_socket != INVALID_SOCKET)		{			closesocket(m_socket);			m_socket = INVALID_SOCKET;		}	}	void ResetOverlapped()	{		Internal = InternalHigh = 0;		Offset = OffsetHigh = 0;		hEvent = NULL;	}	void ResetRecvBuffer()	{		ZeroMemory(m_crecvBuf, MAXBUF);		m_recvBuf.buf = m_crecvBuf;		m_recvBuf.len = MAXBUF;	}	void ResetSendBuffer()	{		ZeroMemory(m_csendBuf, MAXBUF);		m_sendBuf.buf = m_csendBuf;		m_sendBuf.len = MAXBUF;	}public:	SOCKET m_socket;		//套接字							//接收缓冲区,用于AcceptEx, WSARecv操作	WSABUF m_recvBuf;	char m_crecvBuf[MAXBUF];	//发送缓冲区, 用于WSASend操作	WSABUF m_sendBuf;	char m_csendBuf[MAXBUF];	//对端地址	BOOL bIPv6addr=FALSE;	sockaddr_in m_addr;	SOCKADDR_IN6 m_addr_IPv6;	ClientInfo m_cltInfo;};//目的:防止断开连接后,其他地方还保留有连接信息。保留15分钟struct CConnectCache{	std::shared_ptr<COverlappedIOInfo>	pConnInfo;		//连接信息	DWORD								dwDisConnect;	//断开时间};#define HISTORY_CONN_STATIC_COUNT 500//#define IN_WINDOWS typedef enum connDataWay {	CONN_DATA_SEND = 0,	CONN_DATA_RECV}ConnDataWay;typedef struct connDataNode {	SYSTEMTIME time;	ConnDataWay way; // 0 send. 1 recv.	char* data;	int iDataLen;	connDataNode() {		memset(&time, 0, sizeof(SYSTEMTIME));		data = NULL;		iDataLen = 0;	}	~connDataNode() {		if (data) delete[]data;	}}ConnDataNode;typedef struct connHistoryData {	ConnDataNode dataBuffer[HISTORY_CONN_STATIC_COUNT];	HANDLE m_mutex;	int startIndex;	int iLastIndex;	int endIndex;	bool bIsFull;	connHistoryData() {		startIndex = endIndex = iLastIndex = 0;		bIsFull = false;		m_mutex = CreateMutex(NULL, FALSE, NULL);	}	~connHistoryData() {		if (m_mutex) {			CloseHandle(m_mutex);			m_mutex = NULL;		}	}	bool isFull() {		return bIsFull;	}	bool Lock() {		if (m_mutex) {			WaitForSingleObject(m_mutex, INFINITE);			return true;		}		return false;	}	void Unlock() {		if (m_mutex) ReleaseMutex(m_mutex);	}	void AddData(char* buf, int iLen, ConnDataWay way) {		if (!Lock()) return;		::GetLocalTime(&dataBuffer[endIndex].time);		dataBuffer[endIndex].way = way;		if (dataBuffer[endIndex].data) delete[]dataBuffer[endIndex].data;		dataBuffer[endIndex].data = new char[iLen];		if (!dataBuffer[endIndex].data) return;		memcpy(dataBuffer[endIndex].data, buf, iLen);		dataBuffer[endIndex].iDataLen = iLen;		iLastIndex = endIndex;		endIndex = (endIndex + 1) % HISTORY_CONN_STATIC_COUNT;		if (bIsFull) startIndex = endIndex;		else if (endIndex == startIndex) bIsFull = true;		Unlock();	}}ConnHistoryData;class TCPServerIOCP : public ITCPServer {public:	bool StartServer(ITcpServerCallBack* pUser, int iPort, CString strLocalIP = _T(""));	bool StartServer_IPv6(ITcpServerCallBack* pUser, int iPort, CString strLocalIP = _T(""));	void StopServer();	BOOL SendData(const char* pData, int iLen, ClientInfo* pCltInfo);	BOOL SendData(const char* pData, int iLen, CString strIP);	BOOL SendData(const char* pData, int iLen, CString strIP, int port, SOCKET& sock);	BOOL SendData(const char* pData, int iLen, SOCKET& sock);	BOOL SendData(const char* pData, int iLen);	ITcpServerCallBack* m_pCallBackUser = nullptr;	ITcpServerCallBack* m_pCallBackUser_IPv6 = nullptr;	BOOL m_bStarted;	BOOL m_bStarted_IPv6;	void Log(char* sz);	void (*pLog)(char*);//日志接口	std::function<void(int, CString, int, const void*, int, int, int)> m_callBackTcpLog;	//获取IP列表	virtual void GetIPList(std::vector<CString> &vec);	virtual void GetConnectInfo(std::vector<ClientInfo *> &vec);	virtual void GetHistoryConnectInfo(std::map<CString, ConnHistoryInfo *> &mapHis) 	{		mapHis.insert(m_mapConnHistory.begin(), m_mapConnHistory.end());	}	virtual ConnHistoryInfo *GetHistoryConnectInfo(const CString &strIP)	{		auto pFind = m_mapConnHistory.find(strIP);		if (pFind != m_mapConnHistory.end()) {			return pFind->second;		}		return NULL;	}private:	//新连接绑定历史记录	void BindHistoryData(COverlappedIOInfo* info, bool bRecv = false);public:	//构造函数	TCPServerIOCP();	//虚构函数	~TCPServerIOCP();	//开始监听	bool StartListen(unsigned short port, string ip);	bool StartListen_IPv6(unsigned short port, string ip);	int CalcTimePassSecond(SYSTEMTIME* stLast, SYSTEMTIME* stNow = NULL);	void ConnectionMaintain();	/*	释放3个部分步骤:	1. 清空IOCP线程队列,退出线程	2. 清空等待accept套接字m_vecAcps	3. 清空已连接套接字m_vecContInfo并清空缓存	*/	void CloseServer();	//私有成员函数	//启动CPU*2个线程,返回已启动的线程个数	int StartWorkThreadPool();	int StartWorkThreadPool_IPv6();	//处理accept请求,NumberOfBytes=0表示没有收到第一帧数据, >0时表示收到了第一帧数据	bool DoAccept(SOCKET sockAccept, SOCKADDR_IN* ClientAddr);	bool DoAccept_IPv6(SOCKET sockAccept, SOCKADDR_IN6* ClientAddr);	//投递recv请求	bool PostRecv(COverlappedIOInfo* info);	//处理recv请求	bool DoRecv(COverlappedIOInfo* info);	//从已连接的socket列表中移除socket及释放空间	bool DeleteLink(SOCKET s, char* strError);	BOOL IsIPOnline(CString strIP);	int StaticConnData(CString strIp, const char* data, int iLen, ConnDataWay way);	virtual void ClearConnHistoryInfo();	//winsock版本类型	WSAData m_wsaData;	//临时加一个黑名单地址 scz 	ULONG m_blackaddr = 0;	//端口监听套接字	SOCKET m_sListen;	SOCKET m_sListen_IPv6;	//已建立连接的信息,每个结构含有一个套接字、发送缓冲区、接收缓冲区、对端地址	vector<COverlappedIOInfo*> m_vecContInfo;	//缓冲连接信息	std::list<CConnectCache> m_listContInfo;	CCriticalSection m_csCacheListLock;	//操作vector的互斥访问锁	CCriticalSection m_csClientVectorLock;	//CIOCP封装类	CIOCP m_iocp;	CIOCP m_iocp_IPv6;	//连接历史信息和状态信息	std::map<CString, ConnHistoryInfo*> m_mapConnHistory;//暂时弃用,后续启用时记得加锁防止多线程并发访问。多线程操作会出现map有两个相同的key	static void ListenThread_IOCPServer(LPVOID lpParam);	static void ListenThread_IOCPServer_IPv6(LPVOID lpParam);	CString m_strServerIP;	int m_iServerPort;	DWORD m_lastError;	CString m_strServerIP_IPv6;	int m_iServerPort_IPv6;};
 |